Skip to content

Commit

Permalink
Adding benchmarks for framing and optimizing frame generation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefán J. Sigurðarson committed Feb 12, 2021
1 parent 0876795 commit e02e86f
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 101 deletions.
45 changes: 45 additions & 0 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Text;

using BenchmarkDotNet.Attributes;

using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicAck
{
private readonly OutgoingCommand _basicAck = new OutgoingCommand(new BasicAck(ulong.MaxValue, true));

[Benchmark]
public ReadOnlyMemory<byte> BasicAckWrite() => _basicAck.SerializeToFrames(0, 1024 * 1024);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingBasicPublish
{
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly OutgoingContentCommand _basicPublish = new OutgoingContentCommand(new BasicPublish(StringValue, StringValue, false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory<byte>.Empty);
private readonly OutgoingContentCommand _basicPublishMemory = new OutgoingContentCommand(new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory<byte>.Empty);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => _basicPublish.SerializeToFrames(0, 1024 * 1024);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => _basicPublishMemory.SerializeToFrames(0, 1024 * 1024);
}

[Config(typeof(Config))]
[BenchmarkCategory("Framing")]
public class MethodFramingChannelClose
{
private readonly OutgoingCommand _channelClose = new OutgoingCommand(new ChannelClose(333, string.Empty, 0099, 2999));

[Benchmark]
public ReadOnlyMemory<byte> ChannelCloseWrite() => _channelClose.SerializeToFrames(0, 1024 * 1024);
}
}
15 changes: 8 additions & 7 deletions projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
{
internal sealed class BasicPublishBatch : IBasicPublishBatch
{
private readonly List<OutgoingCommand> _commands;
private readonly List<OutgoingContentCommand> _commands;
private readonly ModelBase _model;

internal BasicPublishBatch (ModelBase model)
internal BasicPublishBatch(ModelBase model)
{
_model = model;
_commands = new List<OutgoingCommand>();
_commands = new List<OutgoingContentCommand>();
}

internal BasicPublishBatch (ModelBase model, int sizeHint)
internal BasicPublishBatch(ModelBase model, int sizeHint)
{
_model = model;
_commands = new List<OutgoingCommand>(sizeHint);
_commands = new List<OutgoingContentCommand>(sizeHint);
}

public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
Expand All @@ -60,13 +61,13 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
_routingKey = routingKey,
_mandatory = mandatory
};
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
_commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Add(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
var method = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
_commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Publish()
Expand Down
50 changes: 23 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;

Expand All @@ -47,23 +48,12 @@ internal static class Framing
* +------------+---------+----------------+---------+------------------+
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
* +------------+---------+----------------+---------+------------------+ */
private const int BaseFrameSize = 1 + 2 + 4 + 1;
internal const int BaseFrameSize = 1 + 2 + 4 + 1;
internal const int StartFrameType = 0;
internal const int StartChannel = 1;
internal const int StartPayloadSize = 3;
private const int StartPayload = 7;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int WriteBaseFrame(Span<byte> span, FrameType type, ushort channel, int payloadLength)
{
const int StartFrameType = 0;
const int StartChannel = 1;
const int StartPayloadSize = 3;

span[StartFrameType] = (byte)type;
NetworkOrderSerializer.WriteUInt16(span.Slice(StartChannel), channel);
NetworkOrderSerializer.WriteUInt32(span.Slice(StartPayloadSize), (uint)payloadLength);
span[StartPayload + payloadLength] = Constants.FrameEnd;
return StartPayload + 1 + payloadLength;
}

internal static class Method
{
/* +----------+-----------+-----------+
Expand All @@ -75,14 +65,17 @@ internal static class Method
* +----------+-----------+-----------+ */
public const int FrameSize = BaseFrameSize + 2 + 2;

public static int WriteTo(Span<byte> span, ushort channel, MethodBase method)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo<T>(Span<byte> span, ushort channel, T method) where T : MethodBase
{
const int StartClassId = StartPayload;
const int StartMethodArguments = StartPayload + 4;
const int StartMethodArguments = StartClassId + 4;

int payloadLength = method.WriteArgumentsTo(span.Slice(StartMethodArguments)) + 4;
NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameMethod << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8));
NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)method.ProtocolCommandId);
int offset = method.WriteArgumentsTo(span.Slice(StartMethodArguments));
return WriteBaseFrame(span, FrameType.FrameMethod, channel, StartMethodArguments - StartPayload + offset);
span[payloadLength + StartPayload] = Constants.FrameEnd;
return payloadLength + BaseFrameSize;
}
}

Expand All @@ -95,18 +88,19 @@ internal static class Header
* +----------+----------+-------------------+-----------+ */
public const int FrameSize = BaseFrameSize + 2 + 2 + 8;

public static int WriteTo(Span<byte> span, ushort channel, ContentHeaderBase header, int bodyLength)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo<T>(Span<byte> span, ushort channel, T header, int bodyLength) where T : ContentHeaderBase
{
const int StartClassId = StartPayload;
const int StartWeight = StartPayload + 2;
const int StartBodyLength = StartPayload + 4;
const int StartHeaderArguments = StartPayload + 12;

NetworkOrderSerializer.WriteUInt16(span.Slice(StartClassId), header.ProtocolClassId);
NetworkOrderSerializer.WriteUInt16(span.Slice(StartWeight), 0); // Weight - not used
int payloadLength = 12 + header.WritePropertiesTo(span.Slice(StartHeaderArguments));
NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameHeader << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8));
NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)header.ProtocolClassId << 16); // The last 16 bytes (Weight) aren't used
NetworkOrderSerializer.WriteUInt64(span.Slice(StartBodyLength), (ulong)bodyLength);
int offset = header.WritePropertiesTo(span.Slice(StartHeaderArguments));
return WriteBaseFrame(span, FrameType.FrameHeader, channel, StartHeaderArguments - StartPayload + offset);
span[payloadLength + StartPayload] = Constants.FrameEnd;
return payloadLength + BaseFrameSize;
}
}

Expand All @@ -119,12 +113,14 @@ internal static class BodySegment
* +--------------+ */
public const int FrameSize = BaseFrameSize;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int WriteTo(Span<byte> span, ushort channel, ReadOnlySpan<byte> body)
{
const int StartBodyArgument = StartPayload;

NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8));
body.CopyTo(span.Slice(StartBodyArgument));
return WriteBaseFrame(span, FrameType.FrameBody, channel, StartBodyArgument - StartPayload + body.Length);
span[StartPayload + body.Length] = Constants.FrameEnd;
return body.Length + BaseFrameSize;
}
}

Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;

using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -73,8 +74,7 @@ internal interface ISession
void Close(ShutdownEventArgs reason, bool notify);
bool HandleFrame(in InboundFrame frame);
void Notify();
void Transmit(in OutgoingCommand cmd);
void Transmit(IList<OutgoingCommand> cmds);

void Transmit<T>(in T cmd) where T : struct, IOutgoingCommand;
void Transmit<T>(List<T> cmds) where T : struct, IOutgoingCommand;
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/MainSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
// that ever changes.

using System;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Framing.Impl;

Expand Down Expand Up @@ -108,7 +109,7 @@ public void SetSessionClosing(bool closeServerInitiated)
}
}

public override void Transmit(in OutgoingCommand cmd)
public override void Transmit<T>(in T cmd)
{
lock (_closingLock)
{
Expand Down
17 changes: 5 additions & 12 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ protected void Enqueue(IRpcContinuation k)
{
if (CloseReason is null)
{
_continuationQueue.Enqueue(k);
_continuationQueue.Enqueue(k);
}
else
{
Expand Down Expand Up @@ -364,20 +364,13 @@ protected T ModelRpc<T>(MethodBase method) where T : MethodBase

protected void ModelSend(MethodBase method)
{
ModelSend(method, null, ReadOnlyMemory<byte>.Empty);
Session.Transmit(new OutgoingCommand(method));
}

protected void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte> body)
{
if (method.HasContent)
{
_flowControlBlock.Wait();
Session.Transmit(new OutgoingCommand(method, header, body));
}
else
{
Session.Transmit(new OutgoingCommand(method, header, body));
}
_flowControlBlock.Wait();
Session.Transmit(new OutgoingContentCommand(method, header, body));
}

internal void OnCallbackException(CallbackExceptionEventArgs args)
Expand Down Expand Up @@ -1234,7 +1227,7 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application,
}
}

internal void SendCommands(IList<OutgoingCommand> commands)
internal void SendCommands(List<OutgoingContentCommand> commands)
{
_flowControlBlock.Wait();
if (NextPublishSeqNo > 0)
Expand Down
Loading

0 comments on commit e02e86f

Please sign in to comment.