Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline dispatcher #953

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions projects/Benchmarks/Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.12.1" />
<PackageReference Include="Ductus.FluentDocker" Version="2.7.3" />
</ItemGroup>

<ItemGroup>
Expand Down
77 changes: 77 additions & 0 deletions projects/Benchmarks/Networking/Networking_BasicDeliver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;

namespace Benchmarks.Networking
{
[MemoryDiagnoser]
public class Networking_BasicDeliver
{
private const int messageCount = 10000;

private IDisposable _container;
private static byte[] _body = Encoding.UTF8.GetBytes("hello world");

[GlobalSetup]
public void GlobalSetup()
{
_container = RabbitMQBroker.Start();
}

[GlobalCleanup]
public void GlobalCleanup()
{
_container.Dispose();
}

[Benchmark(Baseline = true)]
public async Task Publish_Hello_World()
{
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
using (var connection = cf.CreateConnection())
{
await Publish_Hello_World(connection);
}
}

[Benchmark()]
public async Task Publish_Hello_World_Inline()
{
var cf = new ConnectionFactory { DispatchConsumerInline = true };
using (var connection = cf.CreateConnection())
{
await Publish_Hello_World(connection);
}
}

public static async Task Publish_Hello_World(IConnection connection)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using (var model = connection.CreateModel())
{
var queue = model.QueueDeclare();
var consumed = 0;
var consumer = new EventingBasicConsumer(model);
consumer.Received += (s, args) =>
{
if (Interlocked.Increment(ref consumed) == messageCount)
{
tcs.SetResult(true);
}
};
model.BasicConsume(queue.QueueName, true, consumer);

for (int i = 0; i < messageCount; i++)
{
model.BasicPublish("", queue.QueueName, null, _body);
}

await tcs.Task;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using System;

namespace Benchmarks.Networking
{
[MemoryDiagnoser]
public class Networking_BasicDeliver_ExistingConnection
{
private IDisposable _container;
private IConnection _defaultconnection;
private IConnection _inlineconnection;

[GlobalSetup]
public void GlobalSetup()
{
_container = RabbitMQBroker.Start();

var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
_defaultconnection = cf.CreateConnection();
cf.DispatchConsumerInline = true;
_inlineconnection = cf.CreateConnection();
}

[GlobalCleanup]
public void GlobalCleanup()
{
_inlineconnection.Dispose();
_defaultconnection.Dispose();
_container.Dispose();
}

[Benchmark(Baseline = true)]
public async Task Publish_Hello_World()
{
await Networking_BasicDeliver.Publish_Hello_World(_defaultconnection);
}

[Benchmark()]
public async Task Publish_Hello_World_Inline()
{
await Networking_BasicDeliver.Publish_Hello_World(_inlineconnection);
}
}
}
26 changes: 26 additions & 0 deletions projects/Benchmarks/Networking/RabbitMqBroker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Threading;
using Ductus.FluentDocker.Builders;

namespace Benchmarks.Networking
{
public class RabbitMQBroker
{
public static IDisposable Start()
{
var broker = new Builder().UseContainer()
.UseImage("rabbitmq")
.ExposePort(5672, 5672)
.WaitForPort("5672/tcp", 40000 /*40s*/)
.ReuseIfExists()
.WithName("rabbitmq")
.WithEnvironment("NODENAME=rabbit1")
.ExecuteOnRunning("rabbitmqctl --node rabbit1 await_startup ")
.KeepContainer()
.Build()
.Start();

return broker;
}
}
}
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ public Uri Uri
/// </summary>
public string ClientProvidedName { get; set; }

/// <inheritdoc/>
public bool DispatchConsumerInline { get; set; }

/// <summary>
/// Given a list of mechanism names supported by the server, select a preferred mechanism,
/// or null if we have none in common.
Expand Down
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ public interface IConnectionFactory
/// <value><see langword="true" /> if an asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/> is used; otherwise, <see langword="false" />.</value>
bool DispatchConsumersAsync { get; set; }

/// <summary>
/// Gets or sets a value indicating whether an inline consumer dispatcher is used.
/// </summary>
/// <value><see langword="true" /> if an inline consumer dispatcher is used; otherwise, <see langword="false" />.</value>
bool DispatchConsumerInline { get; set; }

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

namespace RabbitMQ.Client.Events
{
Expand Down Expand Up @@ -83,6 +84,9 @@ public IDictionary<string, object> UpdateDetails(IDictionary<string, object> oth
///</remarks>
public class CallbackExceptionEventArgs : BaseExceptionEventArgs
{
internal const string Context = "context";
internal const string Consumer = "consumer";

public CallbackExceptionEventArgs(Exception e) : base(e)
{
}
Expand All @@ -92,7 +96,7 @@ public static CallbackExceptionEventArgs Build(Exception e,
{
var details = new Dictionary<string, object>
{
{"context", context}
{Context, context}
};
return Build(e, details);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private async Task Loop()

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
{ CallbackExceptionEventArgs.Consumer, work.Consumer },
{ CallbackExceptionEventArgs.Context, work.Context }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand Down Expand Up @@ -170,8 +170,8 @@ private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSli

var details = new Dictionary<string, object>
{
{ "consumer", work.Consumer },
{ "context", work.Consumer }
{ CallbackExceptionEventArgs.Consumer, work.Consumer },
{ CallbackExceptionEventArgs.Context, work.Context }
};
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ private bool TryRecoverConnectionDelegate()
catch (Exception ex)
{
var a = new CallbackExceptionEventArgs(ex);
a.Detail["context"] = "OnConnectionRecoveryError";
a.Detail[CallbackExceptionEventArgs.Context] = "OnConnectionRecoveryError";
_delegate.OnCallbackException(a);
}
}
Expand Down Expand Up @@ -741,7 +741,7 @@ private void RecoverConsumers()
catch (Exception e)
{
var args = new CallbackExceptionEventArgs(e);
args.Detail["context"] = "OnConsumerRecovery";
args.Detail[CallbackExceptionEventArgs.Context] = "OnConsumerRecovery";
_delegate.OnCallbackException(args);
}
}
Expand Down Expand Up @@ -846,7 +846,7 @@ private void RecoverQueues()
catch (Exception e)
{
var args = new CallbackExceptionEventArgs(e);
args.Detail["context"] = "OnQueueRecovery";
args.Detail[CallbackExceptionEventArgs.Context] = "OnQueueRecovery";
_delegate.OnCallbackException(args);
}
}
Expand All @@ -873,7 +873,7 @@ private void RunRecoveryEventHandlers()
catch (Exception e)
{
var args = new CallbackExceptionEventArgs(e);
args.Detail["context"] = "OnConnectionRecovery";
args.Detail[CallbackExceptionEventArgs.Context] = "OnConnectionRecovery";
_delegate.OnCallbackException(args);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ private void RunRecoveryEventHandlers()
catch (Exception e)
{
var args = new CallbackExceptionEventArgs(e);
args.Detail["context"] = "OnModelRecovery";
args.Detail[CallbackExceptionEventArgs.Context] = "OnModelRecovery";
_delegate.OnCallbackException(args);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer,
{
var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicConsumeOk"}
{CallbackExceptionEventArgs.Consumer, consumer},
{CallbackExceptionEventArgs.Context, nameof(HandleBasicConsumeOk)}
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand Down Expand Up @@ -81,8 +81,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
{
var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicDeliver"}
{CallbackExceptionEventArgs.Consumer, consumer},
{CallbackExceptionEventArgs.Context, nameof(HandleBasicDeliver)}
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand All @@ -105,8 +105,8 @@ public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
{
var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancelOk"}
{CallbackExceptionEventArgs.Consumer, consumer},
{CallbackExceptionEventArgs.Context, nameof(HandleBasicCancelOk)}
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand All @@ -125,8 +125,8 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
{
var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleBasicCancel"}
{CallbackExceptionEventArgs.Consumer, consumer},
{CallbackExceptionEventArgs.Context, nameof(HandleBasicCancel)}
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
Expand All @@ -144,8 +144,8 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
{
var details = new Dictionary<string, object>
{
{"consumer", consumer},
{"context", "HandleModelShutdown"}
{CallbackExceptionEventArgs.Consumer, consumer},
{CallbackExceptionEventArgs.Context, nameof(HandleModelShutdown)}
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
};
Expand Down
12 changes: 7 additions & 5 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -104,7 +104,9 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa

ConsumerWorkService = factory.DispatchConsumersAsync
? new AsyncConsumerWorkService(factory.ConsumerDispatchConcurrency)
: new ConsumerWorkService(factory.ConsumerDispatchConcurrency);
: factory.DispatchConsumerInline
? new InlineWorkService(factory.ConsumerDispatchConcurrency)
: new ConsumerWorkService(factory.ConsumerDispatchConcurrency);

_sessionManager = new SessionManager(this, 0);
_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };
Expand Down Expand Up @@ -626,7 +628,7 @@ public void OnConnectionBlocked(ConnectionBlockedEventArgs args)
}
catch (Exception e)
{
OnCallbackException(CallbackExceptionEventArgs.Build(e, new Dictionary<string, object> { { "context", "OnConnectionBlocked" } }));
OnCallbackException(CallbackExceptionEventArgs.Build(e, new Dictionary<string, object> { { CallbackExceptionEventArgs.Context, nameof(OnConnectionBlocked) } }));
}
}
}
Expand All @@ -641,7 +643,7 @@ public void OnConnectionUnblocked()
}
catch (Exception e)
{
OnCallbackException(CallbackExceptionEventArgs.Build(e, new Dictionary<string, object> { { "context", "OnConnectionUnblocked" } }));
OnCallbackException(CallbackExceptionEventArgs.Build(e, new Dictionary<string, object> { { CallbackExceptionEventArgs.Context, nameof(OnConnectionUnblocked) } }));
}
}
}
Expand Down Expand Up @@ -671,7 +673,7 @@ public void OnShutdown()
OnCallbackException(CallbackExceptionEventArgs.Build(e,
new Dictionary<string, object>
{
{"context", "OnShutdown"}
{CallbackExceptionEventArgs.Context, nameof(OnShutdown)}
}));
}
}
Expand Down
Loading