Skip to content

Commit

Permalink
fix: Leader election failure to restart (#783)
Browse files Browse the repository at this point in the history
This fixes the following behavioral issues noted when
testing a leader-aware operator with transient network issues:
- In `LeaderElectionBackgroundService`, if
`elector.RunUntilLeadershipLostAsync()` throws, the exception is not
observed in the library and no further attempts to become the leader
occur. The library now logs any unexpected exceptions and tries to
become the leader again.
- A leader could not stop and then subsequently start being a leader
once more due to cancellation token sources not being recreated. The
library now disposes and recreates the cancellation token sources as
required.
- `LeaderAwareResourceWatcher<TEntity>.StoppedLeading` would erroneously
pass a cancelled cancellation token to `ResourceWatcher<TEntity>`. The
library now passes the `IHostApplicationLifetime.ApplicationStopped`
token to the `ResourceWatcher<TEntity>` - we can assume that
`ApplicationStopped` is a good indication that the stop should no longer
be graceful.
  • Loading branch information
exextatic committed Jun 28, 2024
1 parent aa073d1 commit e0523ca
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
using k8s.LeaderElection;

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace KubeOps.Operator.LeaderElection;

/// <summary>
/// This background service connects to the API and continuously watches the leader election.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="elector">The elector.</param>
internal sealed class LeaderElectionBackgroundService(LeaderElector elector)
internal sealed class LeaderElectionBackgroundService(ILogger<LeaderElectionBackgroundService> logger, LeaderElector elector)
: IHostedService, IDisposable, IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();
private bool _disposed;
private Task? _leadershipTask;

public Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -26,7 +29,7 @@ public Task StartAsync(CancellationToken cancellationToken)
// Therefore, we use Task.Run() and put the work to queue. The passed cancellation token of the StartAsync
// method is not used, because it would only cancel the scheduling (which we definitely don't want to cancel).
// To make this intention explicit, CancellationToken.None gets passed.
_ = Task.Run(() => elector.RunUntilLeadershipLostAsync(_cts.Token), CancellationToken.None);
_leadershipTask = Task.Run(RunAndTryToHoldLeadershipForeverAsync, CancellationToken.None);

return Task.CompletedTask;
}
Expand All @@ -38,19 +41,23 @@ public void Dispose()
_disposed = true;
}

public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_disposed)
{
return Task.CompletedTask;
return;
}

#if NET8_0_OR_GREATER
return _cts.CancelAsync();
await _cts.CancelAsync();
#else
_cts.Cancel();
return Task.CompletedTask;
#endif

if (_leadershipTask is not null)
{
await _leadershipTask;
}
}

public async ValueTask DisposeAsync()
Expand All @@ -72,4 +79,23 @@ static async ValueTask CastAndDispose(IDisposable resource)
}
}
}

private async Task RunAndTryToHoldLeadershipForeverAsync()
{
while (!_cts.IsCancellationRequested)
{
try
{
await elector.RunUntilLeadershipLostAsync(_cts.Token);
}
catch (OperationCanceledException) when (_cts.IsCancellationRequested)
{
// Ignore cancellation exceptions when we've been asked to stop.
}
catch (Exception exception)
{
logger.LogError(exception, "Failed to hold leadership.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using k8s;
using k8s;
using k8s.LeaderElection;
using k8s.Models;

using KubeOps.Abstractions.Builder;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Queue;

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace KubeOps.Operator.Watcher;
Expand All @@ -16,21 +17,26 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> queue,
OperatorSettings settings,
IKubernetesClient client,
IHostApplicationLifetime hostApplicationLifetime,
LeaderElector elector)
: ResourceWatcher<TEntity>(logger, provider, queue, settings, client)
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly CancellationTokenSource _cts = new();
private CancellationTokenSource _cts = new();
private bool _disposed;

public override Task StartAsync(CancellationToken cancellationToken)
public override async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogDebug("Subscribe for leadership updates.");

elector.OnStartedLeading += StartedLeading;
elector.OnStoppedLeading += StoppedLeading;

return elector.IsLeader() ? base.StartAsync(_cts.Token) : Task.CompletedTask;
if (elector.IsLeader())
{
using CancellationTokenSource linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
await base.StartAsync(linkedCancellationTokenSource.Token);
}
}

public override Task StopAsync(CancellationToken cancellationToken)
Expand All @@ -43,7 +49,8 @@ public override Task StopAsync(CancellationToken cancellationToken)

elector.OnStartedLeading -= StartedLeading;
elector.OnStoppedLeading -= StoppedLeading;
return Task.CompletedTask;

return elector.IsLeader() ? base.StopAsync(cancellationToken) : Task.CompletedTask;
}

protected override void Dispose(bool disposing)
Expand All @@ -63,6 +70,13 @@ protected override void Dispose(bool disposing)
private void StartedLeading()
{
logger.LogInformation("This instance started leading, starting watcher.");

if (_cts.IsCancellationRequested)
{
_cts.Dispose();
_cts = new CancellationTokenSource();
}

base.StartAsync(_cts.Token);
}

Expand All @@ -71,6 +85,9 @@ private void StoppedLeading()
_cts.Cancel();

logger.LogInformation("This instance stopped leading, stopping watcher.");
base.StopAsync(_cts.Token).Wait();

// Stop the base implementation using the 'ApplicationStopped' cancellation token.
// The cancellation token should only be marked cancelled when the stop should no longer be graceful.
base.StopAsync(hostApplicationLifetime.ApplicationStopped).Wait();
}
}
8 changes: 7 additions & 1 deletion src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ internal class ResourceWatcher<TEntity>(
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();

private CancellationTokenSource _cancellationTokenSource = new();
private uint _watcherReconnectRetries;
private Task? _eventWatcher;
private bool _disposed;
Expand All @@ -40,6 +40,12 @@ public virtual Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Starting resource watcher for {ResourceType}.", typeof(TEntity).Name);

if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = new CancellationTokenSource();
}

_eventWatcher = WatchClientEventsAsync(_cancellationTokenSource.Token);

logger.LogInformation("Started resource watcher for {ResourceType}.", typeof(TEntity).Name);
Expand Down
1 change: 1 addition & 0 deletions test/KubeOps.Operator.Test/KubeOps.Operator.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.9.2" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.9.2" />
<PackageReference Include="Microsoft.CodeAnalysis.Workspaces.MSBuild" Version="4.9.2" />
<PackageReference Include="Moq" Version="4.20.70" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using FluentAssertions;

using k8s.LeaderElection;

using KubeOps.Operator.LeaderElection;

using Microsoft.Extensions.Logging;

using Moq;

namespace KubeOps.Operator.Test.LeaderElector;

public sealed class LeaderElectionBackgroundServiceTest
{
[Fact]
public async Task Elector_Throws_Should_Retry()
{
// Arrange.
var logger = Mock.Of<ILogger<LeaderElectionBackgroundService>>();

var electionLock = Mock.Of<ILock>();

var electionLockSubsequentCallEvent = new AutoResetEvent(false);
bool hasElectionLockThrown = false;
Mock.Get(electionLock)
.Setup(electionLock => electionLock.GetAsync(It.IsAny<CancellationToken>()))
.Returns<CancellationToken>(
async cancellationToken =>
{
if (hasElectionLockThrown)
{
// Signal to the test that a subsequent call has been made.
electionLockSubsequentCallEvent.Set();
// Delay returning for a long time, allowing the test to stop the background service, in turn cancelling the cancellation token.
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
throw new InvalidOperationException();
}
hasElectionLockThrown = true;
throw new Exception("Unit test exception");
});

var leaderElectionConfig = new LeaderElectionConfig(electionLock);
var leaderElector = new k8s.LeaderElection.LeaderElector(leaderElectionConfig);

var leaderElectionBackgroundService = new LeaderElectionBackgroundService(logger, leaderElector);

// Act / Assert.
await leaderElectionBackgroundService.StartAsync(CancellationToken.None);

// Starting the background service should result in the lock attempt throwing, and then a subsequent attempt being made.
// Wait for the subsequent event to be signalled, if we time out the test fails.
electionLockSubsequentCallEvent.WaitOne(TimeSpan.FromMilliseconds(500)).Should().BeTrue();

await leaderElectionBackgroundService.StopAsync(CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System.Runtime.CompilerServices;

using k8s;
using k8s.Models;

using KubeOps.Abstractions.Builder;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Queue;
using KubeOps.Operator.Watcher;

using Microsoft.Extensions.Logging;

using Moq;

namespace KubeOps.Operator.Test.Watcher;

public sealed class ResourceWatcherTest
{
[Fact]
public async Task Restarting_Watcher_Should_Trigger_New_Watch()
{
// Arrange.
var logger = Mock.Of<ILogger<ResourceWatcher<V1Pod>>>();
var serviceProvider = Mock.Of<IServiceProvider>();
var timedEntityQueue = new TimedEntityQueue<V1Pod>();
var operatorSettings = new OperatorSettings() { Namespace = "unit-test" };
var kubernetesClient = Mock.Of<IKubernetesClient>();

Mock.Get(kubernetesClient)
.Setup(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()))
.Returns<string?, string?, string?, bool?, CancellationToken>((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken));

var resourceWatcher = new ResourceWatcher<V1Pod>(logger, serviceProvider, timedEntityQueue, operatorSettings, kubernetesClient);

// Act.
// Start and stop the watcher.
await resourceWatcher.StartAsync(CancellationToken.None);
await resourceWatcher.StopAsync(CancellationToken.None);

// Restart the watcher.
await resourceWatcher.StartAsync(CancellationToken.None);

// Assert.
Mock.Get(kubernetesClient)
.Verify(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()), Times.Exactly(2));
}

private static async IAsyncEnumerable<T> WaitForCancellationAsync<T>([EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.Delay(Timeout.Infinite, cancellationToken);
yield return default!;
}
}

0 comments on commit e0523ca

Please sign in to comment.