Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanups and performance optimizations. #1017

Merged
merged 4 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Protocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Runtime.CompilerServices;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Framing.Impl;

Expand All @@ -54,10 +56,13 @@ internal sealed class Protocol : ProtocolBase

internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)
{
var commandId = (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);
ProtocolCommandId commandId = ReadCommandId(span);
return DecodeMethodFrom(commandId, span.Slice(4));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ProtocolCommandId ReadCommandId(ReadOnlySpan<byte> span) => (ProtocolCommandId)Util.NetworkOrderDeserializer.ReadUInt32(span);

private static Client.Impl.MethodBase DecodeMethodFrom(ProtocolCommandId commandId, ReadOnlySpan<byte> span)
{
switch (commandId)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -67,6 +67,7 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
Schedule(new ModelShutdown(consumer, reason, _model));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleUnlessShuttingDown(Work work)
{
if (!IsShutdown)
Expand All @@ -75,6 +76,7 @@ private void ScheduleUnlessShuttingDown(Work work)
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Schedule(Work work)
{
_workService.Schedule(_model, work);
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Buffers;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;
Expand Down Expand Up @@ -126,7 +127,7 @@ private void ParseHeaderFrame(in InboundFrame frame)
_bodyBytes = Array.Empty<byte>();
}

_remainingBodyBytes = (int) totalBodyBytes;
_remainingBodyBytes = (int)totalBodyBytes;
UpdateContentBodyState();
}

Expand Down
37 changes: 14 additions & 23 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -137,7 +138,7 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
add
{
ThrowIfDisposed();
if (CloseReason is null)
if (IsOpen)
{
_connectionShutdownWrapper.AddHandler(value);
}
Expand All @@ -157,31 +158,35 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<EventArgs> RecoverySucceeded {
public event EventHandler<EventArgs> RecoverySucceeded
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError {
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery {
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
{
add { }
remove { }
}

/// <summary>
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
/// </summary>
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery {
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery
{
add { }
remove { }
}
Expand Down Expand Up @@ -273,7 +278,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
_session0.Transmit(new OutgoingCommand(new Impl.ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0)));
}
catch (AlreadyClosedException)
{
Expand Down Expand Up @@ -357,12 +362,6 @@ private void ClosingLoop()
}
}

private OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _);
return request;
}

internal ISession CreateSession()
{
return _sessionManager.Create();
Expand Down Expand Up @@ -407,9 +406,7 @@ private bool HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(ConnectionCloseWrapper(
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
_session0.Transmit(new OutgoingCommand(new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0)));
return true;
}
catch (IOException ioe)
Expand Down Expand Up @@ -646,12 +643,12 @@ private void QuiesceChannel(SoftProtocolException pe)
// our peer. The peer will respond through the lower
// layers - specifically, through the QuiescingSession we
// installed above.
newSession.Transmit(ChannelCloseWrapper(pe.ReplyCode, pe.Message));
newSession.Transmit(new OutgoingCommand(new Impl.ChannelClose(pe.ReplyCode, pe.Message, 0, 0)));
}

private bool SetCloseReason(ShutdownEventArgs reason)
{
return System.Threading.Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
}

private void MaybeStartHeartbeatTimers()
Expand Down Expand Up @@ -838,12 +835,6 @@ public void Dispose()
}
}

private OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request);
return request;
}

private void StartAndTune()
{
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
Expand Down
52 changes: 25 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,22 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
_rentedArray = rentedArray;
}

private static void ProcessProtocolHeader(Stream reader)
private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> frameHeader)
{
try
{
byte b1 = (byte)reader.ReadByte();
byte b2 = (byte)reader.ReadByte();
byte b3 = (byte)reader.ReadByte();
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
if (frameHeader[0] != 'M' || frameHeader[1] != 'Q' || frameHeader[2] != 'P')
{
throw new MalformedFrameException("Invalid AMQP protocol header from server");
}

int transportHigh = reader.ReadByte();
int transportLow = reader.ReadByte();
int serverMajor = reader.ReadByte();
int serverMinor = reader.ReadByte();
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
if (serverMinor == -1)
{
throw new EndOfStreamException();
}

throw new PacketNotRecognizedException(frameHeader[3], frameHeader[4], frameHeader[5], serverMinor);
}
catch (EndOfStreamException)
{
Expand All @@ -198,39 +197,38 @@ private static void ProcessProtocolHeader(Stream reader)

internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
int type = default;
try
{
type = reader.ReadByte();
if (reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length) == 0)
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
}
catch (IOException ioe)
{
// If it's a WSAETIMEDOUT SocketException, unwrap it.
// This might happen when the limit of half-open connections is
// reached.
if (ioe.InnerException is null ||
!(ioe.InnerException is SocketException exception) ||
exception.SocketErrorCode != SocketError.TimedOut)
if (ioe?.InnerException is SocketException exception && exception.SocketErrorCode == SocketError.TimedOut)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
else
{
throw;
}

ExceptionDispatchInfo.Capture(ioe.InnerException).Throw();
}

switch (type)
byte firstByte = frameHeaderBuffer[0];
if (firstByte == 'A')
{
case -1:
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
case 'A':
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader);
break;
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader, frameHeaderBuffer.AsSpan(1, 6));
}

reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value
FrameType type = (FrameType)firstByte;
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 2));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 3, 4)); // FIXME - throw exn on unreasonable value

const int EndMarkerLength = 1;
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
Expand All @@ -257,7 +255,7 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}

return new InboundFrame((FrameType)type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
return new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
}

public byte[] TakeoverPayload()
Expand Down
Loading