Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framing benchmarks and optimizations #1016

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Text;

using BenchmarkDotNet.Attributes;

using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicAck
{
private readonly OutgoingCommand _basicAck = new OutgoingCommand(new BasicAck(ulong.MaxValue, true));

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => _basicAck.SerializeToFrames(0, 1024 * 1024);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicPublish
{
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly OutgoingContentCommand _basicPublish = new OutgoingContentCommand(new BasicPublish(StringValue, StringValue, false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory<byte>.Empty);
private readonly OutgoingContentCommand _basicPublishMemory = new OutgoingContentCommand(new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory<byte>.Empty);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => _basicPublish.SerializeToFrames(0, 1024 * 1024);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => _basicPublishMemory.SerializeToFrames(0, 1024 * 1024);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingChannelClose
{
private readonly OutgoingCommand _channelClose = new OutgoingCommand(new ChannelClose(333, string.Empty, 0099, 2999));

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => _channelClose.SerializeToFrames(0, 1024 * 1024);
}
}
15 changes: 8 additions & 7 deletions projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
{
internal sealed class BasicPublishBatch : IBasicPublishBatch
{
private readonly List<OutgoingCommand> _commands;
private readonly List<OutgoingContentCommand> _commands;
private readonly ModelBase _model;

internal BasicPublishBatch (ModelBase model)
internal BasicPublishBatch(ModelBase model)
{
_model = model;
_commands = new List<OutgoingCommand>();
_commands = new List<OutgoingContentCommand>();
}

internal BasicPublishBatch (ModelBase model, int sizeHint)
internal BasicPublishBatch(ModelBase model, int sizeHint)
{
_model = model;
_commands = new List<OutgoingCommand>(sizeHint);
_commands = new List<OutgoingContentCommand>(sizeHint);
}

public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
Expand All @@ -60,13 +61,13 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
_routingKey = routingKey,
_mandatory = mandatory
};
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
_commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Add(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
var method = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
_commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Publish()
Expand Down
50 changes: 23 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;

Expand All @@ -47,23 +48,12 @@ internal static class Framing
* +------------+---------+----------------+---------+------------------+
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
* +------------+---------+----------------+---------+------------------+ */
private const int BaseFrameSize = 1 + 2 + 4 + 1;
internal const int BaseFrameSize = 1 + 2 + 4 + 1;
internal const int StartFrameType = 0;
internal const int StartChannel = 1;
internal const int StartPayloadSize = 3;
private const int StartPayload = 7;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int WriteBaseFrame(Span<byte> span, FrameType type, ushort channel, int payloadLength)
{
const int StartFrameType = 0;
const int StartChannel = 1;
const int StartPayloadSize = 3;

span[StartFrameType] = (byte)type;
NetworkOrderSerializer.WriteUInt16(span.Slice(StartChannel), channel);
NetworkOrderSerializer.WriteUInt32(span.Slice(StartPayloadSize), (uint)payloadLength);
span[StartPayload + payloadLength] = Constants.FrameEnd;
return StartPayload + 1 + payloadLength;
}

internal static class Method
{
/* +----------+-----------+-----------+
Expand All @@ -75,14 +65,17 @@ internal static class Method
* +----------+-----------+-----------+ */
public const int FrameSize = BaseFrameSize + 2 + 2;

public static int WriteTo(Span<byte> span, ushort channel, MethodBase method)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo<T>(Span<byte> span, ushort channel, T method) where T : MethodBase
{
const int StartClassId = StartPayload;
const int StartMethodArguments = StartPayload + 4;
const int StartMethodArguments = StartClassId + 4;

int payloadLength = method.WriteArgumentsTo(span.Slice(StartMethodArguments)) + 4;
NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameMethod << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8));
stebet marked this conversation as resolved.
Show resolved Hide resolved
NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)method.ProtocolCommandId);
int offset = method.WriteArgumentsTo(span.Slice(StartMethodArguments));
return WriteBaseFrame(span, FrameType.FrameMethod, channel, StartMethodArguments - StartPayload + offset);
span[payloadLength + StartPayload] = Constants.FrameEnd;
return payloadLength + BaseFrameSize;
}
}

Expand All @@ -95,18 +88,19 @@ internal static class Header
* +----------+----------+-------------------+-----------+ */
public const int FrameSize = BaseFrameSize + 2 + 2 + 8;

public static int WriteTo(Span<byte> span, ushort channel, ContentHeaderBase header, int bodyLength)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo<T>(Span<byte> span, ushort channel, T header, int bodyLength) where T : ContentHeaderBase
{
const int StartClassId = StartPayload;
const int StartWeight = StartPayload + 2;
const int StartBodyLength = StartPayload + 4;
const int StartHeaderArguments = StartPayload + 12;

NetworkOrderSerializer.WriteUInt16(span.Slice(StartClassId), header.ProtocolClassId);
NetworkOrderSerializer.WriteUInt16(span.Slice(StartWeight), 0); // Weight - not used
int payloadLength = 12 + header.WritePropertiesTo(span.Slice(StartHeaderArguments));
NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameHeader << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8));
NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)header.ProtocolClassId << 16); // The last 16 bytes (Weight) aren't used
NetworkOrderSerializer.WriteUInt64(span.Slice(StartBodyLength), (ulong)bodyLength);
int offset = header.WritePropertiesTo(span.Slice(StartHeaderArguments));
return WriteBaseFrame(span, FrameType.FrameHeader, channel, StartHeaderArguments - StartPayload + offset);
span[payloadLength + StartPayload] = Constants.FrameEnd;
return payloadLength + BaseFrameSize;
}
}

Expand All @@ -119,12 +113,14 @@ internal static class BodySegment
* +--------------+ */
public const int FrameSize = BaseFrameSize;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo(Span<byte> span, ushort channel, ReadOnlySpan<byte> body)
{
const int StartBodyArgument = StartPayload;

NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8));
body.CopyTo(span.Slice(StartBodyArgument));
return WriteBaseFrame(span, FrameType.FrameBody, channel, StartBodyArgument - StartPayload + body.Length);
span[StartPayload + body.Length] = Constants.FrameEnd;
return body.Length + BaseFrameSize;
}
}

Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -73,8 +74,7 @@ internal interface ISession
void Close(ShutdownEventArgs reason, bool notify);
bool HandleFrame(in InboundFrame frame);
void Notify();
void Transmit(in OutgoingCommand cmd);
void Transmit(IList<OutgoingCommand> cmds);

void Transmit<T>(in T cmd) where T : struct, IOutgoingCommand;
void Transmit<T>(List<T> cmds) where T : struct, IOutgoingCommand;
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/MainSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
// that ever changes.

using System;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Framing.Impl;

Expand Down Expand Up @@ -108,7 +109,7 @@ public void SetSessionClosing(bool closeServerInitiated)
}
}

public override void Transmit(in OutgoingCommand cmd)
public override void Transmit<T>(in T cmd)
{
lock (_closingLock)
{
Expand Down
17 changes: 5 additions & 12 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ protected void Enqueue(IRpcContinuation k)
{
if (CloseReason is null)
{
_continuationQueue.Enqueue(k);
_continuationQueue.Enqueue(k);
}
else
{
Expand Down Expand Up @@ -364,20 +364,13 @@ protected T ModelRpc<T>(MethodBase method) where T : MethodBase

protected void ModelSend(MethodBase method)
{
ModelSend(method, null, ReadOnlyMemory<byte>.Empty);
Session.Transmit(new OutgoingCommand(method));
}

protected void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte> body)
{
if (method.HasContent)
{
_flowControlBlock.Wait();
Session.Transmit(new OutgoingCommand(method, header, body));
}
else
{
Session.Transmit(new OutgoingCommand(method, header, body));
}
_flowControlBlock.Wait();
stebet marked this conversation as resolved.
Show resolved Hide resolved
Session.Transmit(new OutgoingContentCommand(method, header, body));
}

internal void OnCallbackException(CallbackExceptionEventArgs args)
Expand Down Expand Up @@ -1234,7 +1227,7 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application,
}
}

internal void SendCommands(IList<OutgoingCommand> commands)
internal void SendCommands(List<OutgoingContentCommand> commands)
{
_flowControlBlock.Wait();
if (NextPublishSeqNo > 0)
Expand Down
Loading