Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce work allocations #910

Merged
merged 2 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
}

///<summary>Fires the Received event.</summary>
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false);
// No need to call base, it's empty.
return Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
// the only case where we ignore the shutdown flag.
Schedule(new ModelShutdown(consumer, reason));
Schedule(new ModelShutdown(consumer, reason, _model));
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
Expand Down
43 changes: 35 additions & 8 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -79,29 +81,43 @@ public void Enqueue(Work work)
_channel.Writer.TryWrite(work);
}

async Task Loop()
private async Task Loop()
{
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out Work work))
{
try
{
Task task = work.Execute(_model);
Task task = work.Execute();
if (!task.IsCompleted)
{
await task.ConfigureAwait(false);
}
}
catch(Exception)
catch (Exception e)
{
if (!(_model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
work.PostExecute();
}
}
}
}

async Task LoopWithConcurrency(CancellationToken cancellationToken)
private async Task LoopWithConcurrency(CancellationToken cancellationToken)
{
try
{
Expand All @@ -125,22 +141,33 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
}
}

static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
{
try
{
Task task = work.Execute(model);
Task task = work.Execute();
if (!task.IsCompleted)
{
await task.ConfigureAwait(false);
}
}
catch (Exception)
catch (Exception e)
{
// ignored
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
work.PostExecute();
limiter.Release();
}
}
Expand Down
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicCancel.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicCancel : Work
internal sealed class BasicCancel : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicCancel";

public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancel"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicCancel(_consumerTag);
}
}
}
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicCancelOk : Work
internal sealed class BasicCancelOk : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicCancelOk";

public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicCancelOk"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicCancelOk(_consumerTag);
}
}
}
33 changes: 7 additions & 26 deletions projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
sealed class BasicConsumeOk : Work
internal sealed class BasicConsumeOk : Work
{
readonly string _consumerTag;
private readonly string _consumerTag;

public override string Context => "HandleBasicConsumeOk";

public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
{
_consumerTag = consumerTag;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicConsumeOk"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleBasicConsumeOk(_consumerTag);
}
}
}
63 changes: 23 additions & 40 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
sealed class BasicDeliver : Work
internal sealed class BasicDeliver : Work
{
readonly string _consumerTag;
readonly ulong _deliveryTag;
readonly bool _redelivered;
readonly string _exchange;
readonly string _routingKey;
readonly IBasicProperties _basicProperties;
readonly ReadOnlyMemory<byte> _body;
private readonly string _consumerTag;
private readonly ulong _deliveryTag;
private readonly bool _redelivered;
private readonly string _exchange;
private readonly string _routingKey;
private readonly IBasicProperties _basicProperties;
private readonly ReadOnlyMemory<byte> _body;

public override string Context => "HandleBasicDeliver";

public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
Expand All @@ -36,38 +35,22 @@ public BasicDeliver(IBasicConsumer consumer,
_body = body;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleBasicDeliver(_consumerTag,
_deliveryTag,
_redelivered,
_exchange,
_routingKey,
_basicProperties,
_body).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}
return consumer.HandleBasicDeliver(_consumerTag,
_deliveryTag,
_redelivered,
_exchange,
_routingKey,
_basicProperties,
_body);
}

var details = new Dictionary<string, object>()
{
{"consumer", consumer},
{"context", "HandleBasicDeliver"}
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
public override void PostExecute()
{
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
{
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
}
ArrayPool<byte>.Shared.Return(segment.Array);
}
}
}
Expand Down
35 changes: 9 additions & 26 deletions projects/RabbitMQ.Client/client/impl/ModelShutdown.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,23 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
{
sealed class ModelShutdown : Work
internal sealed class ModelShutdown : Work
{
readonly ShutdownEventArgs _reason;
private readonly ShutdownEventArgs _reason;
private readonly IModel _model;

public override string Context => "HandleModelShutdown";

public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer)
public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason, IModel model) : base(consumer)
{
_reason = reason;
_model = model;
}

protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
protected override Task Execute(IAsyncBasicConsumer consumer)
{
try
{
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
}
catch (Exception e)
{
if (!(model is ModelBase modelBase))
{
return;
}

var details = new Dictionary<string, object>()
{
{ "consumer", consumer },
{ "context", "HandleModelShutdown" }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
return consumer.HandleModelShutdown(_model, _reason);
}
}
}
Loading