Skip to content

Commit

Permalink
Minor refactor after reviewing some TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Dec 11, 2023
1 parent f62dcda commit 714be7a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 19 deletions.
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
// TODO cancellation token
ValueTask<IChannel> CreateChannelAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public RecoveryChannelFactory(IConnection connection)
_connection = connection;
}

// TODO cancellation token
public async ValueTask<IChannel> CreateRecoveryChannelAsync()
{
if (_recoveryChannel == null)
Expand Down
24 changes: 13 additions & 11 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -968,18 +968,20 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
Session.Connection.Close(reason, false, InternalConstants.DefaultConnectionCloseTimeout);
}

var method = new ConnectionStart(cmd.MethodSpan);
var details = new ConnectionStartDetails
else
{
m_versionMajor = method._versionMajor,
m_versionMinor = method._versionMinor,
m_serverProperties = method._serverProperties,
m_mechanisms = method._mechanisms,
m_locales = method._locales
};
m_connectionStartCell?.SetResult(details);
m_connectionStartCell = null;
var method = new ConnectionStart(cmd.MethodSpan);
var details = new ConnectionStartDetails
{
m_versionMajor = method._versionMajor,
m_versionMinor = method._versionMinor,
m_serverProperties = method._serverProperties,
m_mechanisms = method._mechanisms,
m_locales = method._locales
};
m_connectionStartCell.SetResult(details);
m_connectionStartCell = null;
}
}
finally
{
Expand Down
14 changes: 6 additions & 8 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,7 @@ private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken)

using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
{
if (connectionStartCell.TrySetCanceled())
{
// TODO anything to do here?
// connectionStartCell.TrySetResult(null);
}
connectionStartCell.TrySetCanceled(cancellationToken);
}, useSynchronizationContext: false);

_channel0.m_connectionStartCell = connectionStartCell;
Expand All @@ -90,10 +86,10 @@ private async ValueTask StartAndTuneAsync(CancellationToken cancellationToken)
await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
.ConfigureAwait(false);

ConnectionStartDetails connectionStart = await connectionStartCell.Task
.ConfigureAwait(false);
Task<ConnectionStartDetails> csct = connectionStartCell.Task;
ConnectionStartDetails connectionStart = await csct.ConfigureAwait(false);

if (connectionStart is null)
if (connectionStart is null || csct.IsCanceled)
{
const string msg = "connection.start was never received, likely due to a network timeout";
throw new IOException(msg, _channel0.ConnectionStartException);
Expand Down Expand Up @@ -125,6 +121,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
if (challenge is null)
{
// TODO cancellationToken
// Note: when token is passed, OperationCanceledException could be raised
res = await _channel0.ConnectionStartOkAsync(ClientProperties,
mechanismFactory.Name,
response,
Expand All @@ -133,6 +130,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
else
{
// TODO cancellationToken
// Note: when token is passed, OperationCanceledException could be raised
res = await _channel0.ConnectionSecureOkAsync(response)
.ConfigureAwait(false);
}
Expand Down

0 comments on commit 714be7a

Please sign in to comment.