From e02e86fd55f8ee1a006bee338ff4158edd72aa83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=2E=20Sigur=C3=B0arson?= Date: Thu, 11 Feb 2021 09:02:44 +0000 Subject: [PATCH] Adding benchmarks for framing and optimizing frame generation. --- .../WireFormatting/MethodFraming.cs | 45 +++++++++ .../client/impl/BasicPublishBatch.cs | 15 +-- projects/RabbitMQ.Client/client/impl/Frame.cs | 50 +++++----- .../RabbitMQ.Client/client/impl/ISession.cs | 6 +- .../client/impl/MainSession.cs | 3 +- .../RabbitMQ.Client/client/impl/ModelBase.cs | 17 +--- .../client/impl/OutgoingCommand.cs | 96 ++++++++++--------- .../client/impl/SessionBase.cs | 10 +- 8 files changed, 141 insertions(+), 101 deletions(-) create mode 100644 projects/Benchmarks/WireFormatting/MethodFraming.cs diff --git a/projects/Benchmarks/WireFormatting/MethodFraming.cs b/projects/Benchmarks/WireFormatting/MethodFraming.cs new file mode 100644 index 0000000000..9fa1142d63 --- /dev/null +++ b/projects/Benchmarks/WireFormatting/MethodFraming.cs @@ -0,0 +1,45 @@ +using System; +using System.Text; + +using BenchmarkDotNet.Attributes; + +using RabbitMQ.Client.Framing.Impl; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Benchmarks +{ + [Config(typeof(Config))] + [BenchmarkCategory("Framing")] + public class MethodFramingBasicAck + { + private readonly OutgoingCommand _basicAck = new OutgoingCommand(new BasicAck(ulong.MaxValue, true)); + + [Benchmark] + public ReadOnlyMemory BasicAckWrite() => _basicAck.SerializeToFrames(0, 1024 * 1024); + } + + [Config(typeof(Config))] + [BenchmarkCategory("Framing")] + public class MethodFramingBasicPublish + { + private const string StringValue = "Exchange_OR_RoutingKey"; + private readonly OutgoingContentCommand _basicPublish = new OutgoingContentCommand(new BasicPublish(StringValue, StringValue, false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory.Empty); + private readonly OutgoingContentCommand _basicPublishMemory = new OutgoingContentCommand(new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false), new Client.Framing.BasicProperties(), ReadOnlyMemory.Empty); + + [Benchmark] + public ReadOnlyMemory BasicPublishWrite() => _basicPublish.SerializeToFrames(0, 1024 * 1024); + + [Benchmark] + public ReadOnlyMemory BasicPublishMemoryWrite() => _basicPublishMemory.SerializeToFrames(0, 1024 * 1024); + } + + [Config(typeof(Config))] + [BenchmarkCategory("Framing")] + public class MethodFramingChannelClose + { + private readonly OutgoingCommand _channelClose = new OutgoingCommand(new ChannelClose(333, string.Empty, 0099, 2999)); + + [Benchmark] + public ReadOnlyMemory ChannelCloseWrite() => _channelClose.SerializeToFrames(0, 1024 * 1024); + } +} diff --git a/projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs b/projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs index 635c13be89..889ff409df 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs @@ -31,25 +31,26 @@ using System; using System.Collections.Generic; + using RabbitMQ.Client.Framing.Impl; namespace RabbitMQ.Client.Impl { internal sealed class BasicPublishBatch : IBasicPublishBatch { - private readonly List _commands; + private readonly List _commands; private readonly ModelBase _model; - internal BasicPublishBatch (ModelBase model) + internal BasicPublishBatch(ModelBase model) { _model = model; - _commands = new List(); + _commands = new List(); } - internal BasicPublishBatch (ModelBase model, int sizeHint) + internal BasicPublishBatch(ModelBase model, int sizeHint) { _model = model; - _commands = new List(sizeHint); + _commands = new List(sizeHint); } public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) @@ -60,13 +61,13 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper _routingKey = routingKey, _mandatory = mandatory }; - _commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body)); + _commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body)); } public void Add(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { var method = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - _commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body)); + _commands.Add(new OutgoingContentCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body)); } public void Publish() diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index 1db6c3d4b5..4d9e60eed4 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -35,6 +35,7 @@ using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; + using RabbitMQ.Client.Exceptions; using RabbitMQ.Util; @@ -47,23 +48,12 @@ internal static class Framing * +------------+---------+----------------+---------+------------------+ * | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte | * +------------+---------+----------------+---------+------------------+ */ - private const int BaseFrameSize = 1 + 2 + 4 + 1; + internal const int BaseFrameSize = 1 + 2 + 4 + 1; + internal const int StartFrameType = 0; + internal const int StartChannel = 1; + internal const int StartPayloadSize = 3; private const int StartPayload = 7; - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int WriteBaseFrame(Span span, FrameType type, ushort channel, int payloadLength) - { - const int StartFrameType = 0; - const int StartChannel = 1; - const int StartPayloadSize = 3; - - span[StartFrameType] = (byte)type; - NetworkOrderSerializer.WriteUInt16(span.Slice(StartChannel), channel); - NetworkOrderSerializer.WriteUInt32(span.Slice(StartPayloadSize), (uint)payloadLength); - span[StartPayload + payloadLength] = Constants.FrameEnd; - return StartPayload + 1 + payloadLength; - } - internal static class Method { /* +----------+-----------+-----------+ @@ -75,14 +65,17 @@ internal static class Method * +----------+-----------+-----------+ */ public const int FrameSize = BaseFrameSize + 2 + 2; - public static int WriteTo(Span span, ushort channel, MethodBase method) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteTo(Span span, ushort channel, T method) where T : MethodBase { const int StartClassId = StartPayload; - const int StartMethodArguments = StartPayload + 4; + const int StartMethodArguments = StartClassId + 4; + int payloadLength = method.WriteArgumentsTo(span.Slice(StartMethodArguments)) + 4; + NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameMethod << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8)); NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)method.ProtocolCommandId); - int offset = method.WriteArgumentsTo(span.Slice(StartMethodArguments)); - return WriteBaseFrame(span, FrameType.FrameMethod, channel, StartMethodArguments - StartPayload + offset); + span[payloadLength + StartPayload] = Constants.FrameEnd; + return payloadLength + BaseFrameSize; } } @@ -95,18 +88,19 @@ internal static class Header * +----------+----------+-------------------+-----------+ */ public const int FrameSize = BaseFrameSize + 2 + 2 + 8; - public static int WriteTo(Span span, ushort channel, ContentHeaderBase header, int bodyLength) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteTo(Span span, ushort channel, T header, int bodyLength) where T : ContentHeaderBase { const int StartClassId = StartPayload; - const int StartWeight = StartPayload + 2; const int StartBodyLength = StartPayload + 4; const int StartHeaderArguments = StartPayload + 12; - NetworkOrderSerializer.WriteUInt16(span.Slice(StartClassId), header.ProtocolClassId); - NetworkOrderSerializer.WriteUInt16(span.Slice(StartWeight), 0); // Weight - not used + int payloadLength = 12 + header.WritePropertiesTo(span.Slice(StartHeaderArguments)); + NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameHeader << 56) | ((ulong)channel << 40) | ((ulong)payloadLength << 8)); + NetworkOrderSerializer.WriteUInt32(span.Slice(StartClassId), (uint)header.ProtocolClassId << 16); // The last 16 bytes (Weight) aren't used NetworkOrderSerializer.WriteUInt64(span.Slice(StartBodyLength), (ulong)bodyLength); - int offset = header.WritePropertiesTo(span.Slice(StartHeaderArguments)); - return WriteBaseFrame(span, FrameType.FrameHeader, channel, StartHeaderArguments - StartPayload + offset); + span[payloadLength + StartPayload] = Constants.FrameEnd; + return payloadLength + BaseFrameSize; } } @@ -119,12 +113,14 @@ internal static class BodySegment * +--------------+ */ public const int FrameSize = BaseFrameSize; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int WriteTo(Span span, ushort channel, ReadOnlySpan body) { const int StartBodyArgument = StartPayload; - + NetworkOrderSerializer.WriteUInt64(span, ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8)); body.CopyTo(span.Slice(StartBodyArgument)); - return WriteBaseFrame(span, FrameType.FrameBody, channel, StartBodyArgument - StartPayload + body.Length); + span[StartPayload + body.Length] = Constants.FrameEnd; + return body.Length + BaseFrameSize; } } diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 53453439e8..ac12511044 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; + using RabbitMQ.Client.Framing.Impl; namespace RabbitMQ.Client.Impl @@ -73,8 +74,7 @@ internal interface ISession void Close(ShutdownEventArgs reason, bool notify); bool HandleFrame(in InboundFrame frame); void Notify(); - void Transmit(in OutgoingCommand cmd); - void Transmit(IList cmds); - + void Transmit(in T cmd) where T : struct, IOutgoingCommand; + void Transmit(List cmds) where T : struct, IOutgoingCommand; } } diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index f6c24de2c8..445d5ad95c 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -35,6 +35,7 @@ // that ever changes. using System; + using RabbitMQ.Client.client.framing; using RabbitMQ.Client.Framing.Impl; @@ -108,7 +109,7 @@ public void SetSessionClosing(bool closeServerInitiated) } } - public override void Transmit(in OutgoingCommand cmd) + public override void Transmit(in T cmd) { lock (_closingLock) { diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index a91bd7874f..9ad4af5c95 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -316,7 +316,7 @@ protected void Enqueue(IRpcContinuation k) { if (CloseReason is null) { - _continuationQueue.Enqueue(k); + _continuationQueue.Enqueue(k); } else { @@ -364,20 +364,13 @@ protected T ModelRpc(MethodBase method) where T : MethodBase protected void ModelSend(MethodBase method) { - ModelSend(method, null, ReadOnlyMemory.Empty); + Session.Transmit(new OutgoingCommand(method)); } protected void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory body) { - if (method.HasContent) - { - _flowControlBlock.Wait(); - Session.Transmit(new OutgoingCommand(method, header, body)); - } - else - { - Session.Transmit(new OutgoingCommand(method, header, body)); - } + _flowControlBlock.Wait(); + Session.Transmit(new OutgoingContentCommand(method, header, body)); } internal void OnCallbackException(CallbackExceptionEventArgs args) @@ -1234,7 +1227,7 @@ await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, } } - internal void SendCommands(IList commands) + internal void SendCommands(List commands) { _flowControlBlock.Wait(); if (NextPublishSeqNo > 0) diff --git a/projects/RabbitMQ.Client/client/impl/OutgoingCommand.cs b/projects/RabbitMQ.Client/client/impl/OutgoingCommand.cs index 5a98fb1fd0..b8515a9a0f 100644 --- a/projects/RabbitMQ.Client/client/impl/OutgoingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/OutgoingCommand.cs @@ -31,49 +31,74 @@ using System; using System.Buffers; -using RabbitMQ.Client.Framing.Impl; +using System.Runtime.CompilerServices; namespace RabbitMQ.Client.Impl { - internal readonly struct OutgoingCommand + internal interface IOutgoingCommand { - // EmptyFrameSize, 8 = 1 + 2 + 4 + 1 - // - 1 byte of frame type - // - 2 bytes of channel number - // - 4 bytes of frame payload length - // - 1 byte of payload trailer FrameEnd byte - private const int EmptyFrameSize = 8; + MethodBase Method { get; } + ReadOnlyMemory SerializeToFrames(ushort channelNumber, uint frameMax); + } - public readonly MethodBase Method; - private readonly ContentHeaderBase _header; - private readonly ReadOnlyMemory _body; + internal readonly struct OutgoingCommand : IOutgoingCommand + { + private readonly MethodBase _method; + public MethodBase Method => _method; public OutgoingCommand(MethodBase method) - : this(method, null, ReadOnlyMemory.Empty) { + _method = method; } - public OutgoingCommand(MethodBase method, ContentHeaderBase header, ReadOnlyMemory body) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly ReadOnlyMemory SerializeToFrames(ushort channelNumber, uint frameMax) + { + int size = Method.GetRequiredBufferSize() + Framing.Method.FrameSize; + + // Will be returned by SocketFrameWriter.WriteLoop + byte[] rentedArray = ArrayPool.Shared.Rent(size); + int offset = Framing.Method.WriteTo(rentedArray, channelNumber, Method); +#if DEBUG + System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); +#endif + + return new ReadOnlyMemory(rentedArray, 0, size); + } + } + + internal readonly struct OutgoingContentCommand : IOutgoingCommand + { + public MethodBase Method => _method; + private readonly MethodBase _method; + private readonly ContentHeaderBase _header; + private readonly ReadOnlyMemory _body; + + public OutgoingContentCommand(MethodBase method, ContentHeaderBase header, ReadOnlyMemory body) { - Method = method; + _method = method; _header = header; _body = body; } - internal void Transmit(ushort channelNumber, Connection connection) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly ReadOnlyMemory SerializeToFrames(ushort channelNumber, uint frameMax) { - int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize); - int size = GetMaxSize(maxBodyPayloadBytes); + int maxBodyPayloadBytes = (int)(frameMax == 0 ? int.MaxValue : frameMax - Framing.BaseFrameSize); + int size = Method.GetRequiredBufferSize() + + _header.GetRequiredPayloadBufferSize() + + Framing.BodySegment.FrameSize * (maxBodyPayloadBytes == int.MaxValue ? 1 : (_body.Length + maxBodyPayloadBytes - 1) / maxBodyPayloadBytes) + _body.Length + + Framing.Method.FrameSize + Framing.Header.FrameSize; // Will be returned by SocketFrameWriter.WriteLoop byte[] rentedArray = ArrayPool.Shared.Rent(size); Span span = rentedArray.AsSpan(0, size); int offset = Framing.Method.WriteTo(span, channelNumber, Method); - if (Method.HasContent) + int remainingBodyBytes = _body.Length; + offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, _header, remainingBodyBytes); + if (remainingBodyBytes > 0) { - int remainingBodyBytes = _body.Length; - offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, _header, remainingBodyBytes); ReadOnlySpan bodySpan = _body.Span; while (remainingBodyBytes > 0) { @@ -83,34 +108,11 @@ internal void Transmit(ushort channelNumber, Connection connection) } } - if (offset != size) - { - throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}"); - } - - connection.Write(new ReadOnlyMemory(rentedArray, 0, size)); - } - - private int GetMaxSize(int maxPayloadBytes) - { - if (!Method.HasContent) - { - return Framing.Method.FrameSize + Method.GetRequiredBufferSize(); - } - - return Framing.Method.FrameSize + Method.GetRequiredBufferSize() + - Framing.Header.FrameSize + _header.GetRequiredPayloadBufferSize() + - Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + _body.Length; - } - - private int GetBodyFrameCount(int maxPayloadBytes) - { - if (maxPayloadBytes == int.MaxValue) - { - return 1; - } +#if DEBUG + System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}"); +#endif - return (_body.Length + maxPayloadBytes - 1) / maxPayloadBytes; + return new ReadOnlyMemory(rentedArray, 0, size); } } } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index df8b831574..67a6a970cb 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Threading; + using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; @@ -124,7 +125,7 @@ public void Notify() throw new Exception("Internal Error in Session.Close"); } - public virtual void Transmit(in OutgoingCommand cmd) + public virtual void Transmit(in T cmd) where T : struct, IOutgoingCommand { if (CloseReason != null) { @@ -135,14 +136,15 @@ public virtual void Transmit(in OutgoingCommand cmd) } // We used to transmit *inside* the lock to avoid interleaving // of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock. - cmd.Transmit(ChannelNumber, Connection); + Connection.Write(cmd.SerializeToFrames(ChannelNumber, Connection.FrameMax)); } - public virtual void Transmit(IList cmds) + public virtual void Transmit(List cmds) where T : struct, IOutgoingCommand { + uint frameMax = Connection.FrameMax; for (int i = 0; i < cmds.Count; i++) { - cmds[i].Transmit(ChannelNumber, Connection); + Connection.Write(cmds[i].SerializeToFrames(ChannelNumber, frameMax)); } } }