Skip to content

Commit

Permalink
Added a synchronous write loop for connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Christiansen authored and lukebakken committed Sep 25, 2023
1 parent 6677851 commit 3a6cbf1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
# Vim
.sw?
.*.sw?


#################
## JetBrains Rider
#################
.idea/
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
/// </summary>
public bool TopologyRecoveryEnabled { get; set; } = true;

/// <summary>
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
/// once requests become asynchronous. Defaults to false.
/// </summary>
public bool EnableSynchronousWriteLoop { get; set; } = false;

/// <summary>
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
Expand Down Expand Up @@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
return ConfigureFrameHandler(fh);
}

Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout,
TimeSpan readTimeout,
TimeSpan writeTimeout)
TimeSpan writeTimeout,
bool enableSynchronousWriteLoop)
{
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
{
MemoryPool = pool
};
Expand Down
48 changes: 41 additions & 7 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ class SocketFrameHandler : IFrameHandler
private readonly byte[] _frameHeaderBuffer;
private bool _closed;
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
private readonly bool _enableSynchronousWriteLoop;

public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
{
_endpoint = endpoint;
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
Expand Down Expand Up @@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
if (_enableSynchronousWriteLoop)
{
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
}
else
{
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
}
}

public AmqpTcpEndpoint Endpoint
Expand Down Expand Up @@ -270,17 +280,41 @@ private async Task WriteLoop()
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out var memory))
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
MemoryPool.Return(segment.Array);
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
{
if (segment.Array != null)
{
await _writer.WriteAsync(segment.Array, segment.Offset, segment.Count).ConfigureAwait(false);
MemoryPool.Return(segment.Array);
}
}
}

await _writer.FlushAsync().ConfigureAwait(false);
}
}

private void SynchronousWriteLoop()
{
while (_channelReader.WaitToReadAsync().AsTask().Result)
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
{
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
{
if (segment.Array != null)
{
_writer.Write(segment.Array, segment.Offset, segment.Count);
MemoryPool.Return(segment.Array);
}
}
}
_writer.Flush();
}
}

private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ namespace RabbitMQ.Client
public RabbitMQ.Client.ICredentialsProvider CredentialsProvider { get; set; }
public RabbitMQ.Client.ICredentialsRefresher CredentialsRefresher { get; set; }
public bool DispatchConsumersAsync { get; set; }
public bool EnableSynchronousWriteLoop { get; set; }
public RabbitMQ.Client.AmqpTcpEndpoint Endpoint { get; set; }
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
Expand Down
14 changes: 14 additions & 0 deletions projects/Unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
}
}

[Test]
public void TestCreateConnectionWithSynchronousWriteLoop()
{
var cf = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = "localhost",
EnableSynchronousWriteLoop = true
};
using (IConnection conn = cf.CreateConnection()){
Assert.AreEqual(5672, conn.Endpoint.Port);
}
}

[Test]
public void TestCreateConnectionUsesDefaultPort()
{
Expand Down

0 comments on commit 3a6cbf1

Please sign in to comment.