Skip to content

Commit

Permalink
Merge pull request #1017 from stebet/codecleanups
Browse files Browse the repository at this point in the history
Code cleanups and performance optimizations.
  • Loading branch information
michaelklishin authored Feb 24, 2021
2 parents 9437b15 + 01f1f0d commit 0810d13
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 132 deletions.
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Protocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Framing.Impl;

Expand All @@ -54,10 +56,13 @@ internal sealed class Protocol : ProtocolBase

internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)
{
var commandId = (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);
ProtocolCommandId commandId = ReadCommandId(span);
return DecodeMethodFrom(commandId, span.Slice(4));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ProtocolCommandId ReadCommandId(ReadOnlySpan<byte> span) => (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);

private static Client.Impl.MethodBase DecodeMethodFrom(ProtocolCommandId commandId, ReadOnlySpan<byte> span)
{
switch (commandId)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -67,6 +67,7 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
Schedule(new ModelShutdown(consumer, reason, _model));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleUnlessShuttingDown(Work work)
{
if (!IsShutdown)
Expand All @@ -75,6 +76,7 @@ private void ScheduleUnlessShuttingDown(Work work)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Schedule(Work work)
{
_workService.Schedule(_model, work);
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Buffers;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;
Expand Down Expand Up @@ -126,7 +127,7 @@ private void ParseHeaderFrame(in InboundFrame frame)
_bodyBytes = Array.Empty<byte>();
}

_remainingBodyBytes = (int) totalBodyBytes;
_remainingBodyBytes = (int)totalBodyBytes;
UpdateContentBodyState();
}

Expand Down
37 changes: 14 additions & 23 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -137,7 +138,7 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
add
{
ThrowIfDisposed();
if (CloseReason is null)
if (IsOpen)
{
_connectionShutdownWrapper.AddHandler(value);
}
Expand All @@ -157,31 +158,35 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<EventArgs> RecoverySucceeded {
public event EventHandler<EventArgs> RecoverySucceeded
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError {
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery {
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery {
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery
{
add { }
remove { }
}
Expand Down Expand Up @@ -273,7 +278,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
_session0.Transmit(new OutgoingCommand(new Impl.ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0)));
}
catch (AlreadyClosedException)
{
Expand Down Expand Up @@ -357,12 +362,6 @@ private void ClosingLoop()
}
}

private OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _);
return request;
}

internal ISession CreateSession()
{
return _sessionManager.Create();
Expand Down Expand Up @@ -407,9 +406,7 @@ private bool HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(ConnectionCloseWrapper(
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
_session0.Transmit(new OutgoingCommand(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0)));
return true;
}
catch (IOException ioe)
Expand Down Expand Up @@ -646,12 +643,12 @@ private void QuiesceChannel(SoftProtocolException pe)
// our peer. The peer will respond through the lower
// layers - specifically, through the QuiescingSession we
// installed above.
newSession.Transmit(ChannelCloseWrapper(pe.ReplyCode, pe.Message));
newSession.Transmit(new OutgoingCommand(new Impl.ChannelClose(pe.ReplyCode, pe.Message, 0, 0)));
}

private bool SetCloseReason(ShutdownEventArgs reason)
{
return System.Threading.Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
}

private void MaybeStartHeartbeatTimers()
Expand Down Expand Up @@ -838,12 +835,6 @@ public void Dispose()
}
}

private OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request);
return request;
}

private void StartAndTune()
{
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
Expand Down
52 changes: 25 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,22 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
_rentedArray = rentedArray;
}

private static void ProcessProtocolHeader(Stream reader)
private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> frameHeader)
{
try
{
byte b1 = (byte)reader.ReadByte();
byte b2 = (byte)reader.ReadByte();
byte b3 = (byte)reader.ReadByte();
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
if (frameHeader[0] != 'M' || frameHeader[1] != 'Q' || frameHeader[2] != 'P')
{
throw new MalformedFrameException("Invalid AMQP protocol header from server");
}

int transportHigh = reader.ReadByte();
int transportLow = reader.ReadByte();
int serverMajor = reader.ReadByte();
int serverMinor = reader.ReadByte();
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
if (serverMinor == -1)
{
throw new EndOfStreamException();
}

throw new PacketNotRecognizedException(frameHeader[3], frameHeader[4], frameHeader[5], serverMinor);
}
catch (EndOfStreamException)
{
Expand All @@ -198,39 +197,38 @@ private static void ProcessProtocolHeader(Stream reader)

internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
int type = default;
try
{
type = reader.ReadByte();
if (reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length) == 0)
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
}
catch (IOException ioe)
{
// If it's a WSAETIMEDOUT SocketException, unwrap it.
// This might happen when the limit of half-open connections is
// reached.
if (ioe.InnerException is null ||
!(ioe.InnerException is SocketException exception) ||
exception.SocketErrorCode != SocketError.TimedOut)
if (ioe?.InnerException is SocketException exception && exception.SocketErrorCode == SocketError.TimedOut)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
else
{
throw;
}

ExceptionDispatchInfo.Capture(ioe.InnerException).Throw();
}

switch (type)
byte firstByte = frameHeaderBuffer[0];
if (firstByte == 'A')
{
case -1:
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
case 'A':
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader);
break;
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader, frameHeaderBuffer.AsSpan(1, 6));
}

reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value
FrameType type = (FrameType)firstByte;
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 2));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 3, 4)); // FIXME - throw exn on unreasonable value

const int EndMarkerLength = 1;
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
Expand All @@ -257,7 +255,7 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}

return new InboundFrame((FrameType)type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
return new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
}

public byte[] TakeoverPayload()
Expand Down
Loading

0 comments on commit 0810d13

Please sign in to comment.