Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Semaphore Disposed Exception in AsyncConsumerWorkService #1015

Merged
merged 1 commit into from
Feb 5, 2021
Merged

Fix Semaphore Disposed Exception in AsyncConsumerWorkService #1015

merged 1 commit into from
Feb 5, 2021

Conversation

ashneilson
Copy link
Contributor

Proposed Changes

Fixes a System.ObjectDisposedException thrown when the SemaphoreSlim is Disposed on a AsyncConsumerWorkService with a Concurrency greater than 1.

Current State

When under load, the ChannelReader can return a Work item after the _limiter has been Disposed resulting in the Exception being thrown.

while (_channel.Reader.TryRead(out Work work))
{
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
if (!_limiter.Wait(0))
{
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
}
_ = HandleConcurrent(work, _model, _limiter);

Proposed Fix

We check if the cancellationToken has been cancelled. If so, we avoid attempting to wait on the _limiter and will not see an Exception thrown.
https://github.com/ricado-group/rabbitmq-dotnet-client/blob/08854f1a83b3c4a1214f41518bbfa380ec673e1c/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs#L130-L139

Types of Changes

  • Bug fix (non-breaking change which fixes issue Semaphore Disposed Exception thrown in AsyncConsumerWorkService #1014)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

@pivotal-issuemaster
Copy link

@ashneilson Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-issuemaster
Copy link

@ashneilson Thank you for signing the Contributor License Agreement!

ashneilson added a commit to ricado-group/dotnet-rabbitmq that referenced this pull request Feb 5, 2021
@michaelklishin michaelklishin merged commit cff0c3d into rabbitmq:master Feb 5, 2021
@michaelklishin
Copy link
Member

Thank you!

michaelklishin added a commit that referenced this pull request Feb 5, 2021
…ispose

Fix Semaphore Disposed Exception in AsyncConsumerWorkService

(cherry picked from commit cff0c3d)
@michaelklishin
Copy link
Member

Backported to 6.x.

@ashneilson ashneilson deleted the fixAsyncConsumerSemaphoreDispose branch February 5, 2021 04:29
@bollhals
Copy link
Contributor

bollhals commented Feb 5, 2021

Hmm probably irrelevant but now the remaining work items won't be processed. (kind of also didn't before due to the dispose exception), but what would we expect here? Shouldn't all the items be processed?

@ashneilson
Copy link
Contributor Author

Thanks for raising this point @bollhals.

The behavior is still as before given the _tokenSource is Cancelled here:

public Task Stop()
{
_channel.Writer.Complete();
_tokenSource?.Cancel();
_limiter?.Dispose();
return _worker;
}

Under typical circumstances, an OperationCanceledException would be thrown on the _channel.Reader.WaitToReadAsync call which prevents any further processing of work items anyhow.

private async Task LoopWithConcurrency(CancellationToken cancellationToken)
{
try
{
while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out Work work) && !cancellationToken.IsCancellationRequested)
{
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
if (!_limiter.Wait(0))
{
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
}
_ = HandleConcurrent(work, _model, _limiter);
}
}
}
catch (OperationCanceledException)
{
// ignored
}
}

One would assume that if the processing of work items (e.g. BasicDeliver) was critical to the user, they would use .ConfirmSelect() on the Channel so the messages would be redelivered upon a Connection Recovery or a new Connection.

@michaelklishin
Copy link
Member

Publisher confirms have no effect on the redelivery behaviour or consumer acknowledgements.

It would be nice to process the backlog of work items that were already submitted, in case anyone has more cycles to spend improving this client :)

@bollhals
Copy link
Contributor

bollhals commented Feb 6, 2021

It would be nice to process the backlog of work items that were already submitted, in case anyone has more cycles to spend improving this client :)

#997 does it.

@bollhals
Copy link
Contributor

bollhals commented Feb 6, 2021

But we could also consider a change to it so that on close it waits but on abort it does not.

@ashneilson
Copy link
Contributor Author

ashneilson commented Feb 7, 2021

Publisher confirms have no effect on the redelivery behaviour or consumer acknowledgements.

Sorry @michaelklishin! Thanks for correcting me. I had meant to suggest with AutoAck disabled, using a .BasicAck() or .BasicNack() for Consumer Acknowledgements would mean redelivery.

A simple solution could be to await _worker before calling _limiter?.Dispose() and like @bollhals suggests a bool abort could be added as an argument so we avoid this on an Abort.

public Task Stop()
{
_channel.Writer.Complete();
_tokenSource?.Cancel();
_limiter?.Dispose();
return _worker;
}

I noted that during Connection Recovery, this WorkService is destroyed and re-created. Would we still want to process the backlog given this could delay recovery quite significantly?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants