Skip to content

Commit

Permalink
Merge pull request #915 from rabbitmq/backport-pr-910
Browse files Browse the repository at this point in the history
Backport #910 to 6.x
  • Loading branch information
michaelklishin authored Jul 12, 2020
2 parents 224134f + 644fb6c commit d344d61
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
}

///<summary>Fires the Received event.</summary>
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false);
// No need to call base, it's empty.
return Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
// the only case where we ignore the shutdown flag.
Schedule(new ModelShutdown(consumer, reason));
Schedule(new ModelShutdown(consumer, reason, _model));
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
Expand Down
43 changes: 35 additions & 8 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -79,29 +81,43 @@ public void Enqueue(Work work)
_channel.Writer.TryWrite(work);
}

async Task Loop()
private async Task Loop()
{
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out Work work))
{
try
{
Task task = work.Execute(_model);
Task task = work.Execute();
if (!task.IsCompleted)
{
await task.ConfigureAwait(false);
}
}
catch(Exception)
catch (Exception e)
{
if (!(_model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
work.PostExecute();
}
}
}
}

async Task LoopWithConcurrency(CancellationToken cancellationToken)
private async Task LoopWithConcurrency(CancellationToken cancellationToken)
{
try
{
Expand All @@ -125,22 +141,33 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
}
}

static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
{
try
{
Task task = work.Execute(model);
Task task = work.Execute();
if (!task.IsCompleted)
{
await task.ConfigureAwait(false);
}
}
catch (Exception)
catch (Exception e)
{
// ignored
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
work.PostExecute();
limiter.Release();
}
}
Expand Down
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicCancel.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicCancel : Work
internal sealed class BasicCancel : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicCancel";

public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancel"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicCancel(_consumerTag);
}
}
}
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicCancelOk : Work
internal sealed class BasicCancelOk : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicCancelOk";

public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicCancelOk"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicCancelOk(_consumerTag);
}
}
}
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicConsumeOk : Work
internal sealed class BasicConsumeOk : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicConsumeOk";

public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicConsumeOk"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicConsumeOk(_consumerTag);
}
}
}
63 changes: 23 additions & 40 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
sealed class BasicDeliver : Work
internal sealed class BasicDeliver : Work
{
readonly string _consumerTag;
readonly ulong _deliveryTag;
readonly bool _redelivered;
readonly string _exchange;
readonly string _routingKey;
readonly IBasicProperties _basicProperties;
readonly ReadOnlyMemory<byte> _body;
private readonly string _consumerTag;
private readonly ulong _deliveryTag;
private readonly bool _redelivered;
private readonly string _exchange;
private readonly string _routingKey;
private readonly IBasicProperties _basicProperties;
private readonly ReadOnlyMemory<byte> _body;

public override string Context => "HandleBasicDeliver";

public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
Expand All @@ -36,38 +35,22 @@ public BasicDeliver(IBasicConsumer consumer,
_body = body;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicDeliver(_consumerTag,
_deliveryTag,
_redelivered,
_exchange,
_routingKey,
_basicProperties,
_body).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}
return consumer.HandleBasicDeliver(_consumerTag,
_deliveryTag,
_redelivered,
_exchange,
_routingKey,
_basicProperties,
_body);
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicDeliver"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
public override void PostExecute()
{
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
{
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
}
ArrayPool<byte>.Shared.Return(segment.Array);
}
}
}
Expand Down
35 changes: 9 additions & 26 deletions projects/RabbitMQ.Client/client/impl/ModelShutdown.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,23 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
sealed class ModelShutdown : Work
internal sealed class ModelShutdown : Work
{
readonly ShutdownEventArgs _reason;
private readonly ShutdownEventArgs _reason;
private readonly IModel _model;

public override string Context => "HandleModelShutdown";

public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer)
public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason, IModel model) : base(consumer)
{
_reason = reason;
_model = model;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{ "consumer", consumer },
{ "context", "HandleModelShutdown" }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleModelShutdown(_model, _reason);
}
}
}
Loading

0 comments on commit d344d61

Please sign in to comment.