Skip to content

Commit

Permalink
Flush messages on disposal in BufferedChannelBase (#65)
Browse files Browse the repository at this point in the history
* Updating BufferedChannelBase.Dispose() to ensure that queued messages are published before returning.

* Adding original cancellation token source checks back in.
  • Loading branch information
moikmellah committed Sep 16, 2024
1 parent 25dc8a0 commit 95b89b9
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime)
/// <summary> An overall cancellation token that may be externally provided </summary>
protected CancellationTokenSource TokenSource { get; }

/// <summary>Internal cancellation token for signalling that all publishing activity has completed.</summary>
private readonly CancellationTokenSource _exitCancelSource = new CancellationTokenSource();

private Channel<IOutboundBuffer<TEvent>> OutChannel { get; }
private Channel<TEvent> InChannel { get; }
private BufferOptions BufferOptions => Options.BufferOptions;
Expand Down Expand Up @@ -236,7 +239,7 @@ private async Task ConsumeOutboundEventsAsync()
var taskList = new List<Task>(_maxConcurrency);

while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
// ReSharper disable once RemoveRedundantBraces
// ReSharper disable once RemoveRedundantBraces
{
if (TokenSource.Token.IsCancellationRequested) break;
if (_signal is { IsSet: true }) break;
Expand All @@ -257,6 +260,7 @@ private async Task ConsumeOutboundEventsAsync()
}
}
await Task.WhenAll(taskList).ConfigureAwait(false);
_exitCancelSource.Cancel();
_callbacks.OutboundChannelExitedCallback?.Invoke();
}

Expand All @@ -278,7 +282,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer
{
response = await ExportAsync(items, TokenSource.Token).ConfigureAwait(false);
_callbacks.ExportResponseCallback?.Invoke(response,
new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite });
new WriteTrackingBufferEventData { Count = outboundBuffer.Count, DurationSinceFirstWrite = outboundBuffer.DurationSinceFirstWrite });
}
catch (Exception e)
{
Expand Down Expand Up @@ -372,12 +376,21 @@ public override string ToString() =>
/// <inheritdoc cref="IDisposable.Dispose"/>
public virtual void Dispose()
{
InboundBuffer.Dispose();
try
{
TokenSource.Cancel();
// Mark inchannel completed to flush buffer and end task, signalling end to outchannel
InChannel.Writer.TryComplete();
OutChannel.Writer.TryComplete();
// Wait a reasonable duration for the outchannel to complete before disposing the rest
if (!_exitCancelSource.IsCancellationRequested)
{
// Allow one retry before we exit
var maxwait = Options.BufferOptions.ExportBackoffPeriod(1);
_exitCancelSource.Token.WaitHandle.WaitOne(maxwait);
}
_exitCancelSource.Dispose();
InboundBuffer.Dispose();
TokenSource.Cancel();
TokenSource.Dispose();
}
catch
{
Expand Down

0 comments on commit 95b89b9

Please sign in to comment.