Skip to content

Commit

Permalink
Improved the main sample's docker image to respect the SIGTERM from K…
Browse files Browse the repository at this point in the history
…ubernetes to perform a graceful shutdown. As a result, the pods shutting down will not vote. Closes Raft - Block voting when node shutdown #18

Better usage of IPMessageEndpoint, so that it does not starve the SWIM messages in case the Raft leader cannot receive a response from its followers.
Sample has an example for a distributed counter with exposed API
Multi-targeting enabled to take some improvements in the .net6 UDP client

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Aug 20, 2023
1 parent ef70d59 commit 8f43660
Show file tree
Hide file tree
Showing 32 changed files with 165 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/Common.NuGet.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<Version>0.8.0</Version>
<Version>0.8.1</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReadmeFile>NuGet.md</PackageReadmeFile>
<PackageIcon>icon.png</PackageIcon>
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ WORKDIR /app
COPY --from=build /app/publish .
ARG SERVICE
ENV SERVICE_FILE "${SERVICE}.dll"
CMD dotnet $SERVICE_FILE
ENTRYPOINT [ "/usr/bin/dotnet", "SlimCluster.Samples.Service.dll" ]
8 changes: 5 additions & 3 deletions src/Samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

## SlimCluster.Samples.Service

This sample showcases a simple .NET Console App that can be built into a docker image and then deployed to Kubernetes (minikube or docker desktop).
This sample showcases a simple .NET Service (exposing a distributed counter API) that can be built into a docker image and then deployed to Kubernetes (minikube or docker desktop).
The app is running in 3 instances (pods) on Kubernetes. The instances are forming a cluster.

The node name is assigned from the machine name (`Environment.MachineName`), this is to have the node name align with pod names in Kubernetes.

> The scripts were tests on Docker Desktop with Minikube
To run the sample:

1. Navigate to `SlimCluster.Samples.Service` folder.
Expand All @@ -17,10 +19,10 @@ To run the sample:
./Docker-BuildSample.ps1
```

3. Deploy to Kubernetes (minikube):
3. Deploy to Kubernetes:

```txt
kubectl apply -f deployment.yml
./Kube-ApplySample.ps1
```

4. Check if the pods are running:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# run with tty console, interactive mode, and remove container after program ends
docker run -it --rm zarusz/slimcluster_samples_consoleapp:latest
docker run -it --rm zarusz/slimcluster_samples_service:latest

# In case you need to debug why it's not starting:
# docker run -it zarusz/slimcluster_samples_consoleapp:latest sh
10 changes: 8 additions & 2 deletions src/Samples/SlimCluster.Samples.Service/MainApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
using SlimCluster.Membership;
using SlimCluster.Membership.Swim;

public record MainApp(ILogger<MainApp> Logger, IClusterMembership ClusterMembership, ICluster Cluster) : IHostedService
public record MainApp(ILogger<MainApp> Logger, IClusterMembership ClusterMembership/*, ICluster Cluster*/) : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("Starting service...");

void PrintActiveMembers() => Logger.LogInformation("This node is aware of {NodeList}", string.Join(", ", ClusterMembership.Members.Select(x => x.Node.ToString())));

// doc:fragment:ExampleMembershipChanges
Expand Down Expand Up @@ -38,5 +40,9 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("Stopping service...");
return Task.CompletedTask;
}
}
17 changes: 11 additions & 6 deletions src/Samples/SlimCluster.Samples.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@
using SlimCluster.Consensus.Raft.Logs;
using SlimCluster.Membership.Swim;
using SlimCluster.Persistence.LocalFile;
using SlimCluster.Samples.ConsoleApp;
using SlimCluster.Samples.ConsoleApp.State.Logs;
using SlimCluster.Samples.ConsoleApp.State.StateMachine;
using SlimCluster.Serialization;
using SlimCluster.Serialization.Json;
using SlimCluster.Transport.Ip;

var builder = WebApplication.CreateBuilder(args);

//builder.Host.UseConsoleLifetime();

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Services.AddHostedService<MainApp>();

// doc:fragment:ExampleStartup
builder.Services.AddSlimCluster(cfg =>
{
Expand Down Expand Up @@ -51,6 +58,7 @@
// Raft app specific implementation
builder.Services.AddSingleton<ILogRepository, InMemoryLogRepository>(); // For now, store the logs in memory only
builder.Services.AddSingleton<IStateMachine, CounterStateMachine>(); // This is app specific machine that implements a distributed counter
builder.Services.AddSingleton<ISerializationTypeAliasProvider, CommandSerializationTypeAliasProvider>();

// Requires packages: SlimCluster.Membership.Swim, SlimCluster.Consensus.Raft, SlimCluster.Serialization.Json, SlimCluster.Transport.Ip, SlimCluster.Persistence.LocalFile
// doc:fragment:ExampleStartup
Expand All @@ -63,16 +71,13 @@
var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseSwagger();
app.UseSwaggerUI();

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();
await app.RunAsync();
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
namespace SlimCluster.Samples.ConsoleApp.State.Logs;

public abstract record AbstractCommand
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimCluster.Samples.ConsoleApp.State.Logs;

using SlimCluster.Serialization;

internal class CommandSerializationTypeAliasProvider : ISerializationTypeAliasProvider
{
public IReadOnlyDictionary<string, Type> GetTypeAliases() => new Dictionary<string, Type>
{
["dec"] = typeof(DecrementCounterCommand),
["inc"] = typeof(IncrementCounterCommand),
["rst"] = typeof(ResetCounterCommand),
};
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
namespace SlimCluster.Samples.ConsoleApp.State.Logs;

/// <summary>
/// Sets the counter to 0
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/SlimCluster.Samples.Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"SlimCluster": "Information",
"SlimCluster.Transport": "Information",
"SlimCluster.Consensus": "Information",
"SlimCluster.Membership": "Warning"
"SlimCluster.Membership": "Information"
}
},
"UdpPort": "60001",
Expand Down
6 changes: 5 additions & 1 deletion src/Samples/SlimCluster.Samples.Service/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ kind: Deployment
metadata:
name: sc-service
labels:
app: scca
app: sc-service
run: sc-service
spec:
replicas: 3
selector:
Expand All @@ -21,3 +22,6 @@ spec:
ports:
- containerPort: 5000
- containerPort: 60001
env:
- name: ASPNETCORE_URLS
value: "http://+:5000"
5 changes: 3 additions & 2 deletions src/Samples/SlimCluster.Samples.Service/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ kind: Service
metadata:
name: sc-service-svc
labels:
run: sc-service-svc
run: sc-service
spec:
type: NodePort
ports:
- port: 80
- port: 5000
protocol: TCP
selector:
run: sc-service
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public class RaftConsensusOptions
/// The service type that should be resolved from MSDI for Log Entry serialization.
/// </summary>
public Type LogSerializerType { get; set; } = typeof(ISerializer);

/// <summary>
/// The timeout for a leader to process the request.
/// </summary>
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(10);
}
7 changes: 5 additions & 2 deletions src/SlimCluster.Consensus.Raft/RaftFollowerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ public class RaftFollowerState
private readonly ITime _time;
private readonly ILogger<RaftFollowerState> _logger;

private readonly int _term;

public DateTimeOffset LeaderTimeout { get; protected set; }

public INode? Leader { get; protected set; }

public RaftFollowerState(ILogger<RaftFollowerState> logger, RaftConsensusOptions options, ITime time, INode? leaderNode)
public RaftFollowerState(ILogger<RaftFollowerState> logger, RaftConsensusOptions options, ITime time, int term, INode? leaderNode)
{
_logger = logger;
_options = options;
_time = time;
_term = term;
Leader = leaderNode;
OnLeaderMessage(leaderNode);
}
Expand All @@ -25,7 +28,7 @@ public void OnLeaderMessage(INode? leaderNode)
if (Leader != leaderNode)
{
Leader = leaderNode;
_logger.LogInformation("New leader is {Node}", Leader);
_logger.LogInformation("New leader is {Node} for term {Term}", Leader, _term);
}
}
}
24 changes: 17 additions & 7 deletions src/SlimCluster.Consensus.Raft/RaftLeaderState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace SlimCluster.Consensus.Raft;

using System.Collections.Concurrent;
using System.Diagnostics;

using SlimCluster.Consensus.Raft.Logs;
using SlimCluster.Host.Common;
Expand Down Expand Up @@ -95,7 +96,7 @@ protected override async Task<bool> OnLoopRun(CancellationToken token)

if (sendNewLogEntries || sendPing)
{
var task = ReplicateLogWithFollower(lastIndex, followerReplicationState, member.Node, now, skipEntries: sendFirstPing, token);
var task = ReplicateLogWithFollower(lastIndex, followerReplicationState, member.Node, skipEntries: sendFirstPing, token);
tasks.Add(task);
}
}
Expand Down Expand Up @@ -153,7 +154,7 @@ private int FindMajorityReplicatedIndex()
return Math.Max(majorityMatchIndex, _logRepository.CommitedIndex);
}

protected internal async Task ReplicateLogWithFollower(LogIndex lastIndex, FollowerReplicatonState followerReplicationState, INode followerNode, DateTimeOffset now, bool skipEntries, CancellationToken token)
protected internal async Task ReplicateLogWithFollower(LogIndex lastIndex, FollowerReplicatonState followerReplicationState, INode followerNode, bool skipEntries, CancellationToken token)
{
var prevLogIndex = followerReplicationState.NextIndex - 1;

Expand Down Expand Up @@ -184,7 +185,7 @@ protected internal async Task ReplicateLogWithFollower(LogIndex lastIndex, Follo
var resp = await _messageSender.SendRequest(req, followerNode.Address, timeout: _options.LeaderTimeout);

// when the response arrives, update the timestamp of when the request was sent
followerReplicationState.LastAppendRequest = now;
followerReplicationState.LastAppendRequest = _time.Now;

if (resp.Success)
{
Expand All @@ -211,10 +212,13 @@ protected internal async Task ReplicateLogWithFollower(LogIndex lastIndex, Follo
followerReplicationState.NextIndex--;
}
}
catch (OperationCanceledException ex)
catch (OperationCanceledException)
{
_logger.LogWarning(ex, "{Node}: Did not recieve {MessageName} in time, will retry...", followerNode, nameof(AppendEntriesResponse));
// will retry next time
// Will retry next time
_logger.LogWarning("{Node}: Did not recieve {MessageName} in time, will retry...", followerNode, nameof(AppendEntriesResponse));

// The response did not arrive, account for the wait time we already lost to not keep on calling the possibly failed follower
followerReplicationState.LastAppendRequest = _time.Now;
}
}

Expand All @@ -240,12 +244,14 @@ private FollowerReplicatonState EnsureReplicationState(string nodeId)
var commandIndex = await _logRepository.Append(Term, command);
_logger.LogTrace("Appended command at index {Index} in term {Term}", commandIndex, Term);

var requestTimer = Stopwatch.StartNew();

var tcs = new TaskCompletionSource<object?>();
_pendingCommandResults.TryAdd(commandIndex, tcs);
try
{
// Wait until index committed (replicated to majority of nodes) and applied to state machine
while (true)
while (requestTimer.Elapsed < _options.RequestTimeout)
{
token.ThrowIfCancellationRequested();

Expand All @@ -261,9 +267,13 @@ private FollowerReplicatonState EnsureReplicationState(string nodeId)
}
finally
{
// Attempt to cancel if still was pending
tcs.TrySetCanceled();

// Clean up pending commands
_pendingCommandResults.TryRemove(commandIndex, out var _);
}
return null;
}

#region IDurableComponent
Expand Down
7 changes: 5 additions & 2 deletions src/SlimCluster.Consensus.Raft/RaftNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

using System.Collections.Concurrent;

using Microsoft.Extensions.DependencyInjection;

using SlimCluster.Consensus.Raft.Logs;
using SlimCluster.Host;
using SlimCluster.Host.Common;
Expand Down Expand Up @@ -137,7 +139,7 @@ protected async Task BecomeFollower(int term)

_candidateState = null;

_followerState = new RaftFollowerState(_loggerFactory.CreateLogger<RaftFollowerState>(), _options, _time, null);
_followerState = new RaftFollowerState(_loggerFactory.CreateLogger<RaftFollowerState>(), _options, _time, _currentTerm, null);
}

protected async Task BecomeLeader()
Expand All @@ -147,7 +149,7 @@ protected async Task BecomeLeader()

_followerState = null;

var logSerializer = (ISerializer)_serviceProvider.GetService(_options.LogSerializerType);
var logSerializer = (ISerializer)_serviceProvider.GetRequiredService(_options.LogSerializerType);

_leaderState = new RaftLeaderState(
_loggerFactory.CreateLogger<RaftLeaderState>(),
Expand Down Expand Up @@ -365,6 +367,7 @@ protected override async Task<bool> OnLoopRun(CancellationToken token)
if (Status == RaftNodeStatus.Follower && _followerState != null)
{
// When election timeout, start new election.
// ToDo: Allow for a longer window here on node start esp when the leader is already established - wait for membership for a bit
if (_time.Now > _followerState.LeaderTimeout)
{
_logger.LogInformation("Did not hear from leader within the alloted timeout {LeaderTimeout} - starting an election", _options.LeaderTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.1;net6</TargetFrameworks>
<Description>Raft consensus algorithm implementation for a distributed .NET service</Description>
<PackageTags>SlimCluster Cluster Consensus Raft</PackageTags>
</PropertyGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/SlimCluster.Host/Common/TaskLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ protected TaskLoop(ILogger logger)
: this(logger, TimeSpan.FromMilliseconds(100))
{
}

public async Task Start()

{
if (!_isStarted && !_isStarting)
{
Expand Down Expand Up @@ -54,13 +54,13 @@ public async Task Stop()
_isStopping = true;
try
{
await OnStopping();

if (_loopCts != null)
{
_loopCts.Cancel();
}

await OnStopping();

if (_loopTask != null)
{
await _loopTask;
Expand Down
Loading

0 comments on commit 8f43660

Please sign in to comment.