diff --git a/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs b/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs index 34a0ce4367..78cb0f5381 100644 --- a/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs +++ b/projects/RabbitMQ.Client/client/api/AmqpTimestamp.cs @@ -51,7 +51,7 @@ namespace RabbitMQ.Client /// timestamps are signed or unsigned. /// /// - public struct AmqpTimestamp : IEquatable + public readonly struct AmqpTimestamp : IEquatable { /// /// Construct an . diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index ea36e9bab6..cbab785463 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -107,13 +107,6 @@ public interface IConnection : INetworkConnection, IDisposable /// bool IsOpen { get; } - /// - /// Returns the known hosts that came back from the - /// broker in the connection.open-ok method at connection - /// startup time. Null until the connection is completely open and ready for use. - /// - AmqpTcpEndpoint[] KnownHosts { get; } - /// /// The this connection is using to communicate with its peer. /// @@ -224,15 +217,5 @@ public interface IConnection : INetworkConnection, IDisposable /// Create and return a fresh channel, session, and model. /// IModel CreateModel(); - - /// - /// Handle incoming Connection.Blocked methods. - /// - void HandleConnectionBlocked(string reason); - - /// - /// Handle incoming Connection.Unblocked methods. - /// - void HandleConnectionUnblocked(); } } diff --git a/projects/RabbitMQ.Client/client/exceptions/AlreadyClosedException.cs b/projects/RabbitMQ.Client/client/exceptions/AlreadyClosedException.cs index e36032b037..43776cb61d 100644 --- a/projects/RabbitMQ.Client/client/exceptions/AlreadyClosedException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/AlreadyClosedException.cs @@ -36,9 +36,7 @@ namespace RabbitMQ.Client.Exceptions /// Thrown when the application tries to make use of a /// session or connection that has already been shut /// down. -#if !NETSTANDARD1_5 [Serializable] -#endif public class AlreadyClosedException : OperationInterruptedException { ///Construct an instance containing the given diff --git a/projects/RabbitMQ.Client/client/exceptions/AuthenticationFailureException.cs b/projects/RabbitMQ.Client/client/exceptions/AuthenticationFailureException.cs index 556d5e6843..7b04d7d42a 100644 --- a/projects/RabbitMQ.Client/client/exceptions/AuthenticationFailureException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/AuthenticationFailureException.cs @@ -35,9 +35,7 @@ namespace RabbitMQ.Client.Exceptions { /// Thrown when the cause is an /// authentication failure. -#if !NETSTANDARD1_5 [Serializable] -#endif public class AuthenticationFailureException : PossibleAuthenticationFailureException { public AuthenticationFailureException(string msg) : base(msg) diff --git a/projects/RabbitMQ.Client/client/exceptions/BrokerUnreachableException.cs b/projects/RabbitMQ.Client/client/exceptions/BrokerUnreachableException.cs index f19149d8cd..ab80e25702 100644 --- a/projects/RabbitMQ.Client/client/exceptions/BrokerUnreachableException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/BrokerUnreachableException.cs @@ -36,9 +36,7 @@ namespace RabbitMQ.Client.Exceptions { ///Thrown when no connection could be opened during a ///ConnectionFactory.CreateConnection attempt. -#if !NETSTANDARD1_5 [Serializable] -#endif public class BrokerUnreachableException : IOException { ///Construct a BrokerUnreachableException. The inner exception is diff --git a/projects/RabbitMQ.Client/client/exceptions/ChannelAllocationException.cs b/projects/RabbitMQ.Client/client/exceptions/ChannelAllocationException.cs index 3d46fc6cd3..a28acbe59e 100644 --- a/projects/RabbitMQ.Client/client/exceptions/ChannelAllocationException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/ChannelAllocationException.cs @@ -36,9 +36,7 @@ namespace RabbitMQ.Client.Exceptions /// Thrown when a SessionManager cannot allocate a new /// channel number, or the requested channel number is already in /// use. -#if !NETSTANDARD1_5 [Serializable] -#endif public class ChannelAllocationException : ProtocolViolationException { /// diff --git a/projects/RabbitMQ.Client/client/exceptions/ConnectFailureException.cs b/projects/RabbitMQ.Client/client/exceptions/ConnectFailureException.cs index baa840dc46..5d0599617a 100644 --- a/projects/RabbitMQ.Client/client/exceptions/ConnectFailureException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/ConnectFailureException.cs @@ -34,9 +34,7 @@ namespace RabbitMQ.Client.Exceptions { /// Thrown when a connection to the broker fails -#if !NETSTANDARD1_5 [Serializable] -#endif public class ConnectFailureException : ProtocolViolationException { public ConnectFailureException(string msg, Exception inner) diff --git a/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs b/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs index 36b22a5713..ad739931be 100644 --- a/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs @@ -40,9 +40,7 @@ namespace RabbitMQ.Client.Exceptions /// operation, an OperationInterruptedException will be thrown to /// the caller of IModel.QueueDeclare. /// -#if !NETSTANDARD1_5 [Serializable] -#endif public class OperationInterruptedException // TODO: inherit from OperationCanceledException : RabbitMQClientException diff --git a/projects/RabbitMQ.Client/client/exceptions/PacketNotRecognizedException.cs b/projects/RabbitMQ.Client/client/exceptions/PacketNotRecognizedException.cs index a89797659c..487792ee60 100644 --- a/projects/RabbitMQ.Client/client/exceptions/PacketNotRecognizedException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/PacketNotRecognizedException.cs @@ -41,9 +41,7 @@ namespace RabbitMQ.Client.Exceptions ///The peer's {'A','M','Q','P',txHi,txLo,major,minor} packet is ///decoded into instances of this class. /// -#if !NETSTANDARD1_5 [Serializable] -#endif public class PacketNotRecognizedException : RabbitMQClientException { ///Fills the new instance's properties with the values passed in. diff --git a/projects/RabbitMQ.Client/client/exceptions/PossibleAuthenticationFailureException.cs b/projects/RabbitMQ.Client/client/exceptions/PossibleAuthenticationFailureException.cs index 13e6030168..e6b2ec481b 100644 --- a/projects/RabbitMQ.Client/client/exceptions/PossibleAuthenticationFailureException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/PossibleAuthenticationFailureException.cs @@ -35,9 +35,7 @@ namespace RabbitMQ.Client.Exceptions { /// Thrown when the likely cause is an /// authentication failure. -#if !NETSTANDARD1_5 [Serializable] -#endif public class PossibleAuthenticationFailureException : RabbitMQClientException { public PossibleAuthenticationFailureException(string msg, Exception inner) : base(msg, inner) diff --git a/projects/RabbitMQ.Client/client/exceptions/ProtocolVersionMismatchException.cs b/projects/RabbitMQ.Client/client/exceptions/ProtocolVersionMismatchException.cs index f22a6fef13..dc430a0660 100644 --- a/projects/RabbitMQ.Client/client/exceptions/ProtocolVersionMismatchException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/ProtocolVersionMismatchException.cs @@ -36,9 +36,7 @@ namespace RabbitMQ.Client.Exceptions ///Thrown to indicate that the peer does not support the ///wire protocol version we requested immediately after opening ///the TCP socket. -#if !NETSTANDARD1_5 [Serializable] -#endif public class ProtocolVersionMismatchException : ProtocolViolationException { ///Fills the new instance's properties with the values passed in. diff --git a/projects/RabbitMQ.Client/client/exceptions/ProtocolViolationException.cs b/projects/RabbitMQ.Client/client/exceptions/ProtocolViolationException.cs index 5799f31d4a..b5bc9826ec 100644 --- a/projects/RabbitMQ.Client/client/exceptions/ProtocolViolationException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/ProtocolViolationException.cs @@ -33,9 +33,7 @@ namespace RabbitMQ.Client.Exceptions { -#if !NETSTANDARD1_5 [Serializable] -#endif public class ProtocolViolationException : RabbitMQClientException { public ProtocolViolationException(string message) : base(message) diff --git a/projects/RabbitMQ.Client/client/exceptions/RabbitMQClientException.cs b/projects/RabbitMQ.Client/client/exceptions/RabbitMQClientException.cs index 449c475403..99f1a3a7b3 100644 --- a/projects/RabbitMQ.Client/client/exceptions/RabbitMQClientException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/RabbitMQClientException.cs @@ -33,9 +33,7 @@ namespace RabbitMQ.Client.Exceptions { -#if !NETSTANDARD1_5 [Serializable] -#endif public abstract class RabbitMQClientException : Exception { /// Initializes a new instance of the class. diff --git a/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs b/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs index ae1de53009..3d879f19ec 100644 --- a/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs +++ b/projects/RabbitMQ.Client/client/exceptions/WireFormattingException.cs @@ -35,9 +35,7 @@ namespace RabbitMQ.Client.Exceptions { /// Thrown when the wire-formatting code cannot encode a /// particular .NET value to AMQP protocol format. -#if !NETSTANDARD1_5 [Serializable] -#endif public class WireFormattingException : ProtocolViolationException { ///Construct a WireFormattingException with no diff --git a/projects/RabbitMQ.Client/client/framing/Model.cs b/projects/RabbitMQ.Client/client/framing/Model.cs index 9db34fd49d..d995bda47a 100644 --- a/projects/RabbitMQ.Client/client/framing/Model.cs +++ b/projects/RabbitMQ.Client/client/framing/Model.cs @@ -116,11 +116,6 @@ public override void _Private_ConfirmSelect(bool nowait) } } - public override void _Private_ConnectionClose(ushort replyCode, string replyText, ushort classId, ushort methodId) - { - ModelRpc(new ConnectionClose(replyCode, replyText, classId, methodId)); - } - public override void _Private_ConnectionCloseOk() { ModelSend(new ConnectionCloseOk()); @@ -298,7 +293,7 @@ public override void TxSelect() ModelRpc(new TxSelect()); } - public override bool DispatchAsynchronous(in IncomingCommand cmd) + protected override bool DispatchAsynchronous(in IncomingCommand cmd) { switch (cmd.Method.ProtocolCommandId) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index a3d176a7a3..554c45ae99 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -73,7 +73,7 @@ internal void DeleteRecordedExchange(string exchangeName) } } - public void DeleteAutoDeleteExchange(string exchangeName) + internal void DeleteAutoDeleteExchange(string exchangeName) { lock (_recordedEntitiesLock) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index ff4d1f8343..97d3a2fd2d 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -159,10 +159,10 @@ private bool TryRecoverConnectionDelegate() { try { - var defunctConnection = _delegate; + var defunctConnection = _innerConnection; IFrameHandler fh = _endpoints.SelectOne(_factory.CreateFrameHandler); - _delegate = new Connection(_factory, fh, ClientProvidedName); - _delegate.TakeOver(defunctConnection); + _innerConnection = new Connection(_factory, fh, ClientProvidedName); + _innerConnection.TakeOver(defunctConnection); return true; } catch (Exception e) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index e391a3f81b..4bcabdf6f0 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -42,28 +42,35 @@ internal sealed partial class AutorecoveringConnection : IConnection { private readonly ConnectionFactory _factory; + private Connection _innerConnection; private bool _disposed; - private Connection _delegate; // list of endpoints provided on initial connection. // on re-connection, the next host in the line is chosen using // IHostnameSelector private IEndpointResolver _endpoints; + private Connection InnerConnection + { + get + { + ThrowIfDisposed(); + return _innerConnection; + } + } + public AutorecoveringConnection(ConnectionFactory factory, string clientProvidedName = null) { _factory = factory; ClientProvidedName = clientProvidedName; - Action onException = (exception, context) => _delegate.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + Action onException = (exception, context) => _innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _recoverySucceededWrapper = new EventingWrapper("OnConnectionRecovery", onException); _connectionRecoveryErrorWrapper = new EventingWrapper("OnConnectionRecoveryError", onException); _consumerTagChangeAfterRecoveryWrapper = new EventingWrapper("OnConsumerRecovery", onException); _queueNameChangeAfterRecoveryWrapper = new EventingWrapper("OnQueueRecovery", onException); } - private Connection Delegate => !_disposed ? _delegate : throw new ObjectDisposedException(GetType().FullName); - public event EventHandler RecoverySucceeded { add => _recoverySucceededWrapper.AddHandler(value); @@ -80,26 +87,26 @@ public event EventHandler ConnectionRecoveryEr public event EventHandler CallbackException { - add => Delegate.CallbackException += value; - remove => Delegate.CallbackException -= value; + add => InnerConnection.CallbackException += value; + remove => InnerConnection.CallbackException -= value; } public event EventHandler ConnectionBlocked { - add => Delegate.ConnectionBlocked += value; - remove => Delegate.ConnectionBlocked -= value; + add => InnerConnection.ConnectionBlocked += value; + remove => InnerConnection.ConnectionBlocked -= value; } public event EventHandler ConnectionShutdown { - add => Delegate.ConnectionShutdown += value; - remove => Delegate.ConnectionShutdown -= value; + add => InnerConnection.ConnectionShutdown += value; + remove => InnerConnection.ConnectionShutdown -= value; } public event EventHandler ConnectionUnblocked { - add => Delegate.ConnectionUnblocked += value; - remove => Delegate.ConnectionUnblocked -= value; + add => InnerConnection.ConnectionUnblocked += value; + remove => InnerConnection.ConnectionUnblocked -= value; } public event EventHandler ConsumerTagChangeAfterRecovery @@ -118,56 +125,52 @@ public event EventHandler QueueNameChang public string ClientProvidedName { get; } - public ushort ChannelMax => Delegate.ChannelMax; - - public IDictionary ClientProperties => Delegate.ClientProperties; - - public ShutdownEventArgs CloseReason => Delegate.CloseReason; + public ushort ChannelMax => InnerConnection.ChannelMax; - public AmqpTcpEndpoint Endpoint => Delegate.Endpoint; + public IDictionary ClientProperties => InnerConnection.ClientProperties; - public uint FrameMax => Delegate.FrameMax; + public ShutdownEventArgs CloseReason => InnerConnection.CloseReason; - public TimeSpan Heartbeat => Delegate.Heartbeat; + public AmqpTcpEndpoint Endpoint => InnerConnection.Endpoint; - public bool IsOpen => _delegate?.IsOpen ?? false; + public uint FrameMax => InnerConnection.FrameMax; - public AmqpTcpEndpoint[] KnownHosts - { - get => Delegate.KnownHosts; - set => Delegate.KnownHosts = value; - } + public TimeSpan Heartbeat => InnerConnection.Heartbeat; - public int LocalPort => Delegate.LocalPort; + public bool IsOpen => _innerConnection?.IsOpen ?? false; - public ProtocolBase Protocol => Delegate.Protocol; + public int LocalPort => InnerConnection.LocalPort; - public int RemotePort => Delegate.RemotePort; + public int RemotePort => InnerConnection.RemotePort; - public IDictionary ServerProperties => Delegate.ServerProperties; + public IDictionary ServerProperties => InnerConnection.ServerProperties; - public IList ShutdownReport => Delegate.ShutdownReport; + public IList ShutdownReport => InnerConnection.ShutdownReport; - IProtocol IConnection.Protocol => Endpoint.Protocol; + public IProtocol Protocol => Endpoint.Protocol; public RecoveryAwareModel CreateNonRecoveringModel() { - ISession session = Delegate.CreateSession(); + ISession session = InnerConnection.CreateSession(); var result = new RecoveryAwareModel(session) { ContinuationTimeout = _factory.ContinuationTimeout }; result._Private_ChannelOpen(); return result; } - public override string ToString() => $"AutorecoveringConnection({Delegate.Id},{Endpoint},{GetHashCode()})"; + public override string ToString() + => $"AutorecoveringConnection({InnerConnection.Id},{Endpoint},{GetHashCode()})"; - public void Init() => Init(_factory.EndpointResolverFactory(new List { _factory.Endpoint })); + internal void Init() + { + Init(_factory.EndpointResolverFactory(new List { _factory.Endpoint })); + } - public void Init(IEndpointResolver endpoints) + internal void Init(IEndpointResolver endpoints) { ThrowIfDisposed(); _endpoints = endpoints; IFrameHandler fh = endpoints.SelectOne(_factory.CreateFrameHandler); - _delegate = new Connection(_factory, fh, ClientProvidedName); + _innerConnection = new Connection(_factory, fh, ClientProvidedName); ConnectionShutdown += HandleConnectionShutdown; } @@ -176,7 +179,7 @@ public void UpdateSecret(string newSecret, string reason) { ThrowIfDisposed(); EnsureIsOpen(); - _delegate.UpdateSecret(newSecret, reason); + _innerConnection.UpdateSecret(newSecret, reason); _factory.Password = newSecret; } @@ -185,9 +188,9 @@ public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool a { ThrowIfDisposed(); StopRecoveryLoop(); - if (_delegate.IsOpen) + if (_innerConnection.IsOpen) { - _delegate.Close(reasonCode, reasonText, timeout, abort); + _innerConnection.Close(reasonCode, reasonText, timeout, abort); } } @@ -199,47 +202,41 @@ public IModel CreateModel() return m; } - void IDisposable.Dispose() => Dispose(true); - - public void HandleConnectionBlocked(string reason) => Delegate.HandleConnectionBlocked(reason); - - public void HandleConnectionUnblocked() => Delegate.HandleConnectionUnblocked(); - - private void Dispose(bool disposing) + public void Dispose() { if (_disposed) { return; } - if (disposing) + try { - try - { - this.Abort(); - } - catch (Exception) - { - // TODO: log - } - finally - { - _models.Clear(); - _delegate = null; - _disposed = true; - } + this.Abort(); + } + catch (Exception) + { + // TODO: log + } + finally + { + _models.Clear(); + _innerConnection = null; + _disposed = true; } } - private void EnsureIsOpen() => Delegate.EnsureIsOpen(); + private void EnsureIsOpen() + => InnerConnection.EnsureIsOpen(); [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { if (_disposed) { - throw new ObjectDisposedException(GetType().FullName); + ThrowDisposed(); } + + static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName); } } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs index 5bac12a238..c2047f90f2 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs @@ -39,121 +39,127 @@ namespace RabbitMQ.Client.Impl { - internal sealed class AutorecoveringModel : IFullModel, IRecoverable + internal sealed class AutorecoveringModel : IModel, IRecoverable { - private bool _disposed; private AutorecoveringConnection _connection; - private RecoveryAwareModel _delegate; + private RecoveryAwareModel _innerChannel; + private bool _disposed; private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; private bool _usesPublisherConfirms; private bool _usesTransactions; - public IConsumerDispatcher ConsumerDispatcher => !_disposed ? _delegate.ConsumerDispatcher : throw new ObjectDisposedException(GetType().FullName); + internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher; + + internal RecoveryAwareModel InnerChannel + { + get + { + ThrowIfDisposed(); + return _innerChannel; + } + } public TimeSpan ContinuationTimeout { - get => Delegate.ContinuationTimeout; - set => Delegate.ContinuationTimeout = value; + get => InnerChannel.ContinuationTimeout; + set => InnerChannel.ContinuationTimeout = value; } - public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate) + public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel innerChannel) { _connection = conn; - this._delegate = _delegate; + _innerChannel = innerChannel; } public event EventHandler BasicAcks { - add => Delegate.BasicAcks += value; - remove => Delegate.BasicAcks -= value; + add => InnerChannel.BasicAcks += value; + remove => InnerChannel.BasicAcks -= value; } public event EventHandler BasicNacks { - add => Delegate.BasicNacks += value; - remove => Delegate.BasicNacks -= value; + add => InnerChannel.BasicNacks += value; + remove => InnerChannel.BasicNacks -= value; } public event EventHandler BasicRecoverOk { - add => Delegate.BasicRecoverOk += value; - remove => Delegate.BasicRecoverOk -= value; + add => InnerChannel.BasicRecoverOk += value; + remove => InnerChannel.BasicRecoverOk -= value; } public event EventHandler BasicReturn { - add => Delegate.BasicReturn += value; - remove => Delegate.BasicReturn -= value; + add => InnerChannel.BasicReturn += value; + remove => InnerChannel.BasicReturn -= value; } public event EventHandler CallbackException { - add => Delegate.CallbackException += value; - remove => Delegate.CallbackException -= value; + add => InnerChannel.CallbackException += value; + remove => InnerChannel.CallbackException -= value; } public event EventHandler FlowControl { - add { Delegate.FlowControl += value; } - remove { Delegate.FlowControl -= value; } + add { InnerChannel.FlowControl += value; } + remove { InnerChannel.FlowControl -= value; } } public event EventHandler ModelShutdown { - add => Delegate.ModelShutdown += value; - remove => Delegate.ModelShutdown -= value; + add => InnerChannel.ModelShutdown += value; + remove => InnerChannel.ModelShutdown -= value; } public event EventHandler Recovery { - add { RecoveryAwareDelegate.Recovery += value; } - remove { RecoveryAwareDelegate.Recovery -= value; } + add { InnerChannel.Recovery += value; } + remove { InnerChannel.Recovery -= value; } } - public int ChannelNumber => Delegate.ChannelNumber; + public int ChannelNumber => InnerChannel.ChannelNumber; - public ShutdownEventArgs CloseReason => Delegate.CloseReason; + public ShutdownEventArgs CloseReason => InnerChannel.CloseReason; public IBasicConsumer DefaultConsumer { - get => Delegate.DefaultConsumer; - set => Delegate.DefaultConsumer = value; + get => InnerChannel.DefaultConsumer; + set => InnerChannel.DefaultConsumer = value; } - public IModel Delegate => RecoveryAwareDelegate; - private RecoveryAwareModel RecoveryAwareDelegate => !_disposed ? _delegate : throw new ObjectDisposedException(GetType().FullName); + public bool IsClosed => _innerChannel != null && _innerChannel.IsClosed; - public bool IsClosed => _delegate != null && _delegate.IsClosed; + public bool IsOpen => _innerChannel != null && _innerChannel.IsOpen; - public bool IsOpen => _delegate != null && _delegate.IsOpen; + public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo; - public ulong NextPublishSeqNo => Delegate.NextPublishSeqNo; - - public void AutomaticallyRecover(AutorecoveringConnection conn) + internal void AutomaticallyRecover(AutorecoveringConnection conn) { ThrowIfDisposed(); _connection = conn; - RecoveryAwareModel defunctModel = _delegate; + RecoveryAwareModel defunctModel = _innerChannel; - _delegate = conn.CreateNonRecoveringModel(); - _delegate.TakeOver(defunctModel); + _innerChannel = conn.CreateNonRecoveringModel(); + _innerChannel.TakeOver(defunctModel); RecoverState(); - RecoveryAwareDelegate.RunRecoveryEventHandlers(this); + InnerChannel.RunRecoveryEventHandlers(this); } - public void BasicQos(ushort prefetchCount, - bool global) => Delegate.BasicQos(0, prefetchCount, global); + public void BasicQos(ushort prefetchCount, bool global) + => InnerChannel.BasicQos(0, prefetchCount, global); public void Close(ushort replyCode, string replyText, bool abort) { ThrowIfDisposed(); try { - _delegate.Close(replyCode, replyText, abort); + _innerChannel.Close(replyCode, replyText, abort); } finally { @@ -161,406 +167,38 @@ public void Close(ushort replyCode, string replyText, bool abort) } } - public override string ToString() => Delegate.ToString(); - - void IDisposable.Dispose() => Dispose(true); + public override string ToString() + => InnerChannel.ToString(); - private void Dispose(bool disposing) + public void Dispose() { if (_disposed) { return; } - if (disposing) - { - this.Abort(); - - _connection = null; - _delegate = null; - _disposed = true; - } - } - - public void ConnectionTuneOk(ushort channelMax, - uint frameMax, - ushort heartbeat) - { - ThrowIfDisposed(); - _delegate.ConnectionTuneOk(channelMax, frameMax, heartbeat); - } - - public void HandleBasicAck(ulong deliveryTag, bool multiple) - { - ThrowIfDisposed(); - _delegate.HandleBasicAck(deliveryTag, multiple); - } - - public void HandleBasicCancel(string consumerTag, bool nowait) - { - ThrowIfDisposed(); - _delegate.HandleBasicCancel(consumerTag, nowait); - } - - public void HandleBasicCancelOk(string consumerTag) - { - ThrowIfDisposed(); - _delegate.HandleBasicCancelOk(consumerTag); - } - - public void HandleBasicConsumeOk(string consumerTag) - { - ThrowIfDisposed(); - _delegate.HandleBasicConsumeOk(consumerTag); - } - - public void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] rentedArray) - { - ThrowIfDisposed(); - _delegate.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray); - } - - public void HandleBasicGetEmpty() - { - ThrowIfDisposed(); - _delegate.HandleBasicGetEmpty(); - } - - public void HandleBasicGetOk(ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - uint messageCount, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] rentedArray) - { - ThrowIfDisposed(); - _delegate.HandleBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount, basicProperties, body, rentedArray); - } - - public void HandleBasicNack(ulong deliveryTag, - bool multiple, - bool requeue) - { - ThrowIfDisposed(); - _delegate.HandleBasicNack(deliveryTag, multiple, requeue); - } - - public void HandleBasicRecoverOk() - { - ThrowIfDisposed(); - _delegate.HandleBasicRecoverOk(); - } - - public void HandleBasicReturn(ushort replyCode, - string replyText, - string exchange, - string routingKey, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] rentedArray) - { - ThrowIfDisposed(); - _delegate.HandleBasicReturn(replyCode, replyText, exchange, routingKey, basicProperties, body, rentedArray); - } - - public void HandleChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - ThrowIfDisposed(); - _delegate.HandleChannelClose(replyCode, replyText, classId, methodId); - } - - public void HandleChannelCloseOk() - { - ThrowIfDisposed(); - _delegate.HandleChannelCloseOk(); - } - - public void HandleChannelFlow(bool active) - { - ThrowIfDisposed(); - _delegate.HandleChannelFlow(active); - } - - public void HandleConnectionBlocked(string reason) - { - ThrowIfDisposed(); - _delegate.HandleConnectionBlocked(reason); - } - - public void HandleConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - ThrowIfDisposed(); - _delegate.HandleConnectionClose(replyCode, replyText, classId, methodId); - } - - public void HandleConnectionSecure(byte[] challenge) - { - ThrowIfDisposed(); - _delegate.HandleConnectionSecure(challenge); - } - - public void HandleConnectionStart(byte versionMajor, - byte versionMinor, - IDictionary serverProperties, - byte[] mechanisms, - byte[] locales) - { - ThrowIfDisposed(); - _delegate.HandleConnectionStart(versionMajor, versionMinor, serverProperties, - mechanisms, locales); - } - - public void HandleConnectionTune(ushort channelMax, - uint frameMax, - ushort heartbeat) - { - ThrowIfDisposed(); - _delegate.HandleConnectionTune(channelMax, frameMax, heartbeat); - } - - public void HandleConnectionUnblocked() - { - ThrowIfDisposed(); - _delegate.HandleConnectionUnblocked(); - } - - public void HandleQueueDeclareOk(string queue, - uint messageCount, - uint consumerCount) - { - ThrowIfDisposed(); - _delegate.HandleQueueDeclareOk(queue, messageCount, consumerCount); - } - - public void _Private_BasicCancel(string consumerTag, - bool nowait) - { - ThrowIfDisposed(); - _delegate._Private_BasicCancel(consumerTag, nowait); - } - - public void _Private_BasicConsume(string queue, - string consumerTag, - bool noLocal, - bool autoAck, - bool exclusive, - bool nowait, - IDictionary arguments) - { - ThrowIfDisposed(); - _delegate._Private_BasicConsume(queue, - consumerTag, - noLocal, - autoAck, - exclusive, - nowait, - arguments); - } - - public void _Private_BasicGet(string queue, bool autoAck) - { - ThrowIfDisposed(); - _delegate._Private_BasicGet(queue, autoAck); - } - - public void _Private_BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body) - { - if (routingKey is null) - { - throw new ArgumentNullException(nameof(routingKey)); - } - - ThrowIfDisposed(); - _delegate._Private_BasicPublish(exchange, routingKey, mandatory, - basicProperties, body); - } - - public void _Private_BasicRecover(bool requeue) - { - ThrowIfDisposed(); - _delegate._Private_BasicRecover(requeue); - } - - public void _Private_ChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - ThrowIfDisposed(); - _delegate._Private_ChannelClose(replyCode, replyText, - classId, methodId); - } - - public void _Private_ChannelCloseOk() - { - ThrowIfDisposed(); - _delegate._Private_ChannelCloseOk(); - } + this.Abort(); - public void _Private_ChannelFlowOk(bool active) - { - ThrowIfDisposed(); - _delegate._Private_ChannelFlowOk(active); + _connection = null; + _innerChannel = null; + _disposed = true; } - public void _Private_ChannelOpen() - { - ThrowIfDisposed(); - _delegate._Private_ChannelOpen(); - } - - public void _Private_ConfirmSelect(bool nowait) - { - ThrowIfDisposed(); - _delegate._Private_ConfirmSelect(nowait); - } - - public void _Private_ConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) - { - ThrowIfDisposed(); - _delegate._Private_ConnectionClose(replyCode, replyText, - classId, methodId); - } - - public void _Private_ConnectionCloseOk() - { - ThrowIfDisposed(); - _delegate._Private_ConnectionCloseOk(); - } - - public void _Private_ConnectionOpen(string virtualHost) - { - ThrowIfDisposed(); - _delegate._Private_ConnectionOpen(virtualHost); - } - - public void _Private_ConnectionSecureOk(byte[] response) - { - ThrowIfDisposed(); - _delegate._Private_ConnectionSecureOk(response); - } - - public void _Private_ConnectionStartOk(IDictionary clientProperties, - string mechanism, byte[] response, string locale) - { - ThrowIfDisposed(); - _delegate._Private_ConnectionStartOk(clientProperties, mechanism, - response, locale); - } - - public void _Private_UpdateSecret(byte[] newSecret, string reason) - { - ThrowIfDisposed(); - _delegate._Private_UpdateSecret(newSecret, reason); - } - - public void _Private_ExchangeBind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments) - { - ThrowIfDisposed(); - _delegate._Private_ExchangeBind(destination, source, routingKey, - nowait, arguments); - } - - public void _Private_ExchangeDeclare(string exchange, - string type, - bool passive, - bool durable, - bool autoDelete, - bool @internal, - bool nowait, - IDictionary arguments) - { - ThrowIfDisposed(); - _delegate._Private_ExchangeDeclare(exchange, type, passive, - durable, autoDelete, @internal, - nowait, arguments); - } - - public void _Private_ExchangeDelete(string exchange, - bool ifUnused, - bool nowait) - { - ThrowIfDisposed(); - _delegate._Private_ExchangeDelete(exchange, ifUnused, nowait); - } - - public void _Private_ExchangeUnbind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments) - { - ThrowIfDisposed(); - _delegate._Private_ExchangeUnbind(destination, source, routingKey, - nowait, arguments); - } - - public void _Private_QueueBind(string queue, - string exchange, - string routingKey, - bool nowait, - IDictionary arguments) - { - ThrowIfDisposed(); - _delegate._Private_QueueBind(queue, exchange, routingKey, - nowait, arguments); - } - - public void _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments) => RecoveryAwareDelegate._Private_QueueDeclare(queue, passive, - durable, exclusive, autoDelete, - nowait, arguments); - - public uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait) => RecoveryAwareDelegate._Private_QueueDelete(queue, ifUnused, ifEmpty, nowait); - - public uint _Private_QueuePurge(string queue, bool nowait) => RecoveryAwareDelegate._Private_QueuePurge(queue, nowait); - - public void BasicAck(ulong deliveryTag, - bool multiple) => Delegate.BasicAck(deliveryTag, multiple); + public void BasicAck(ulong deliveryTag, bool multiple) + => InnerChannel.BasicAck(deliveryTag, multiple); public void BasicCancel(string consumerTag) { ThrowIfDisposed(); _connection.DeleteRecordedConsumer(consumerTag); - _delegate.BasicCancel(consumerTag); + _innerChannel.BasicCancel(consumerTag); } public void BasicCancelNoWait(string consumerTag) { ThrowIfDisposed(); _connection.DeleteRecordedConsumer(consumerTag); - _delegate.BasicCancelNoWait(consumerTag); + _innerChannel.BasicCancelNoWait(consumerTag); } public string BasicConsume( @@ -572,42 +210,25 @@ public string BasicConsume( IDictionary arguments, IBasicConsumer consumer) { - string result = Delegate.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer); + string result = InnerChannel.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer); RecordedConsumer rc = new RecordedConsumer(this, consumer, queue, autoAck, result, exclusive, arguments); _connection.RecordConsumer(result, rc); return result; } public BasicGetResult BasicGet(string queue, bool autoAck) - => Delegate.BasicGet(queue, autoAck); + => InnerChannel.BasicGet(queue, autoAck); public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) - => Delegate.BasicNack(deliveryTag, multiple, requeue); - - public void BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body) - { - if (routingKey is null) - { - throw new ArgumentNullException(nameof(routingKey)); - } + => InnerChannel.BasicNack(deliveryTag, multiple, requeue); - Delegate.BasicPublish(exchange, - routingKey, - mandatory, - basicProperties, - body); - } + public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) + => InnerChannel.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); public void BasicPublish(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) - => Delegate.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); + => InnerChannel.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); - public void BasicQos(uint prefetchSize, - ushort prefetchCount, - bool global) + public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global) { ThrowIfDisposed(); if (global) @@ -618,66 +239,42 @@ public void BasicQos(uint prefetchSize, { _prefetchCountConsumer = prefetchCount; } - _delegate.BasicQos(prefetchSize, prefetchCount, global); + _innerChannel.BasicQos(prefetchSize, prefetchCount, global); } - public void BasicRecover(bool requeue) => Delegate.BasicRecover(requeue); + public void BasicRecover(bool requeue) + => InnerChannel.BasicRecover(requeue); - public void BasicRecoverAsync(bool requeue) => Delegate.BasicRecoverAsync(requeue); + public void BasicRecoverAsync(bool requeue) + => InnerChannel.BasicRecoverAsync(requeue); - public void BasicReject(ulong deliveryTag, - bool requeue) => Delegate.BasicReject(deliveryTag, requeue); - - public void Close() - { - ThrowIfDisposed(); - try - { - _delegate.Close(); - } - finally - { - _connection.DeleteRecordedChannel(this); - } - } - - public void Close(ushort replyCode, string replyText) - { - ThrowIfDisposed(); - try - { - _delegate.Close(replyCode, replyText); - } - finally - { - _connection.DeleteRecordedChannel(this); - } - } + public void BasicReject(ulong deliveryTag, bool requeue) + => InnerChannel.BasicReject(deliveryTag, requeue); public void ConfirmSelect() { - Delegate.ConfirmSelect(); + InnerChannel.ConfirmSelect(); _usesPublisherConfirms = true; } public IBasicProperties CreateBasicProperties() - => Delegate.CreateBasicProperties(); + => InnerChannel.CreateBasicProperties(); public void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments) { ThrowIfDisposed(); RecordedBinding eb = new RecordedExchangeBinding(this, destination, source, routingKey, arguments); _connection.RecordBinding(eb); - _delegate.ExchangeBind(destination, source, routingKey, arguments); + _innerChannel.ExchangeBind(destination, source, routingKey, arguments); } public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary arguments) - => Delegate.ExchangeBindNoWait(destination, source, routingKey, arguments); + => InnerChannel.ExchangeBindNoWait(destination, source, routingKey, arguments); public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { ThrowIfDisposed(); - _delegate.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); + _innerChannel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); RecordedExchange rx = new RecordedExchange(this, exchange, type, durable, autoDelete, arguments); _connection.RecordExchange(rx); } @@ -685,23 +282,23 @@ public void ExchangeDeclare(string exchange, string type, bool durable, bool aut public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { ThrowIfDisposed(); - _delegate.ExchangeDeclareNoWait(exchange, type, durable, autoDelete, arguments); + _innerChannel.ExchangeDeclareNoWait(exchange, type, durable, autoDelete, arguments); RecordedExchange rx = new RecordedExchange(this, exchange, type, durable, autoDelete, arguments); _connection.RecordExchange(rx); } public void ExchangeDeclarePassive(string exchange) - => Delegate.ExchangeDeclarePassive(exchange); + => InnerChannel.ExchangeDeclarePassive(exchange); public void ExchangeDelete(string exchange, bool ifUnused) { - Delegate.ExchangeDelete(exchange, ifUnused); + InnerChannel.ExchangeDelete(exchange, ifUnused); _connection.DeleteRecordedExchange(exchange); } public void ExchangeDeleteNoWait(string exchange, bool ifUnused) { - Delegate.ExchangeDeleteNoWait(exchange, ifUnused); + InnerChannel.ExchangeDeleteNoWait(exchange, ifUnused); _connection.DeleteRecordedExchange(exchange); } @@ -710,28 +307,28 @@ public void ExchangeUnbind(string destination, string source, string routingKey, ThrowIfDisposed(); RecordedBinding eb = new RecordedExchangeBinding(this, destination, source, routingKey, arguments); _connection.DeleteRecordedBinding(eb); - _delegate.ExchangeUnbind(destination, source, routingKey, arguments); + _innerChannel.ExchangeUnbind(destination, source, routingKey, arguments); _connection.DeleteAutoDeleteExchange(source); } public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary arguments) - => Delegate.ExchangeUnbind(destination, source, routingKey, arguments); + => InnerChannel.ExchangeUnbind(destination, source, routingKey, arguments); public void QueueBind(string queue, string exchange, string routingKey, IDictionary arguments) { ThrowIfDisposed(); RecordedBinding qb = new RecordedQueueBinding(this, queue, exchange, routingKey, arguments); _connection.RecordBinding(qb); - _delegate.QueueBind(queue, exchange, routingKey, arguments); + _innerChannel.QueueBind(queue, exchange, routingKey, arguments); } public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary arguments) - => Delegate.QueueBind(queue, exchange, routingKey, arguments); + => InnerChannel.QueueBind(queue, exchange, routingKey, arguments); public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { ThrowIfDisposed(); - QueueDeclareOk result = _delegate.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); + QueueDeclareOk result = _innerChannel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); RecordedQueue rq = new RecordedQueue(this, result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments); _connection.RecordQueue(rq); return result; @@ -740,60 +337,64 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, b public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { ThrowIfDisposed(); - _delegate.QueueDeclareNoWait(queue, durable, exclusive, + _innerChannel.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); RecordedQueue rq = new RecordedQueue(this, queue, queue.Length == 0, durable, exclusive, autoDelete, arguments); _connection.RecordQueue(rq); } public QueueDeclareOk QueueDeclarePassive(string queue) - => Delegate.QueueDeclarePassive(queue); + => InnerChannel.QueueDeclarePassive(queue); public uint MessageCount(string queue) - => Delegate.MessageCount(queue); + => InnerChannel.MessageCount(queue); public uint ConsumerCount(string queue) - => Delegate.ConsumerCount(queue); + => InnerChannel.ConsumerCount(queue); public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) { ThrowIfDisposed(); - uint result = _delegate.QueueDelete(queue, ifUnused, ifEmpty); + uint result = _innerChannel.QueueDelete(queue, ifUnused, ifEmpty); _connection.DeleteRecordedQueue(queue); return result; } public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) { - Delegate.QueueDeleteNoWait(queue, ifUnused, ifEmpty); + InnerChannel.QueueDeleteNoWait(queue, ifUnused, ifEmpty); _connection.DeleteRecordedQueue(queue); } public uint QueuePurge(string queue) - => Delegate.QueuePurge(queue); + => InnerChannel.QueuePurge(queue); public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments) { ThrowIfDisposed(); RecordedBinding qb = new RecordedQueueBinding(this, queue, exchange, routingKey, arguments); _connection.DeleteRecordedBinding(qb); - _delegate.QueueUnbind(queue, exchange, routingKey, arguments); + _innerChannel.QueueUnbind(queue, exchange, routingKey, arguments); _connection.DeleteAutoDeleteExchange(exchange); } - public void TxCommit() => Delegate.TxCommit(); + public void TxCommit() + => InnerChannel.TxCommit(); - public void TxRollback() => Delegate.TxRollback(); + public void TxRollback() + => InnerChannel.TxRollback(); public void TxSelect() { - Delegate.TxSelect(); + InnerChannel.TxSelect(); _usesTransactions = true; } - public Task WaitForConfirmsAsync(CancellationToken token = default) => Delegate.WaitForConfirmsAsync(token); + public Task WaitForConfirmsAsync(CancellationToken token = default) + => InnerChannel.WaitForConfirmsAsync(token); - public Task WaitForConfirmsOrDieAsync(CancellationToken token = default) => Delegate.WaitForConfirmsOrDieAsync(token); + public Task WaitForConfirmsOrDieAsync(CancellationToken token = default) + => InnerChannel.WaitForConfirmsOrDieAsync(token); private void RecoverState() { @@ -818,17 +419,21 @@ private void RecoverState() } } - public IBasicPublishBatch CreateBasicPublishBatch() => Delegate.CreateBasicPublishBatch(); + public IBasicPublishBatch CreateBasicPublishBatch() + => InnerChannel.CreateBasicPublishBatch(); - public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint) => Delegate.CreateBasicPublishBatch(sizeHint); + public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint) + => InnerChannel.CreateBasicPublishBatch(sizeHint); [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { if (_disposed) { - throw new ObjectDisposedException(GetType().FullName); + ThrowDisposed(); } + + static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringModel).FullName); } } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 675063e994..c7dae9d21f 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -32,7 +32,6 @@ using System; using System.Collections.Generic; using System.IO; -using System.Net; using System.Net.Sockets; using System.Reflection; using System.Runtime.CompilerServices; @@ -191,11 +190,11 @@ public event EventHandler QueueNameChang public ushort ChannelMax => _sessionManager.ChannelMax; - public IDictionary ClientProperties { get; set; } + public IDictionary ClientProperties { get; private set; } public AmqpTcpEndpoint Endpoint => _frameHandler.Endpoint; - public uint FrameMax { get; set; } + public uint FrameMax { get; private set; } public TimeSpan Heartbeat { @@ -212,21 +211,15 @@ public TimeSpan Heartbeat public bool IsOpen => CloseReason is null; - public AmqpTcpEndpoint[] KnownHosts { get; set; } - - public EndPoint LocalEndPoint => _frameHandler.LocalEndPoint; - public int LocalPort => _frameHandler.LocalPort; ///Another overload of a Protocol property, useful ///for exposing a tighter type. - public ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol; - - public EndPoint RemoteEndPoint => _frameHandler.RemoteEndPoint; + internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol; public int RemotePort => _frameHandler.RemotePort; - public IDictionary ServerProperties { get; set; } + public IDictionary ServerProperties { get; private set; } public IList ShutdownReport => _shutdownReport; private ShutdownReportEntry[] _shutdownReport = Array.Empty(); @@ -234,7 +227,7 @@ public TimeSpan Heartbeat ///Explicit implementation of IConnection.Protocol. IProtocol IConnection.Protocol => Endpoint.Protocol; - public static IDictionary DefaultClientProperties() + internal static IDictionary DefaultClientProperties() { var table = new Dictionary(5) { @@ -329,10 +322,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) } } - /// - /// Loop only used while quiescing. Use only to cleanly close connection - /// - public void ClosingLoop() + private void ClosingLoop() { try { @@ -367,23 +357,18 @@ public void ClosingLoop() } } - public OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText) + private OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText) { Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _); return request; } - public ISession CreateSession() + internal ISession CreateSession() { return _sessionManager.Create(); } - public ISession CreateSession(int channelNumber) - { - return _sessionManager.Create(channelNumber); - } - - public void EnsureIsOpen() + internal void EnsureIsOpen() { if (!IsOpen) { @@ -392,7 +377,7 @@ public void EnsureIsOpen() } // Only call at the end of the Mainloop or HeartbeatLoop - public void FinishClose() + private void FinishClose() { _closed = true; MaybeStopHeartbeatTimers(); @@ -402,12 +387,11 @@ public void FinishClose() _model0.FinishClose(); } - public void HandleMainLoopException(ShutdownEventArgs reason) + private void HandleMainLoopException(ShutdownEventArgs reason) { if (!SetCloseReason(reason)) { - LogCloseError("Unexpected Main Loop Exception while closing: " - + reason, new Exception(reason.ToString())); + LogCloseError("Unexpected Main Loop Exception while closing: " + reason, new Exception(reason.ToString())); return; } @@ -415,7 +399,7 @@ public void HandleMainLoopException(ShutdownEventArgs reason) LogCloseError($"Unexpected connection closure: {reason}", new Exception(reason.ToString())); } - public bool HardProtocolExceptionHandler(HardProtocolException hpe) + private bool HardProtocolExceptionHandler(HardProtocolException hpe) { if (SetCloseReason(hpe.ShutdownReason)) { @@ -441,7 +425,7 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe) return false; } - public void InternalClose(ShutdownEventArgs reason) + internal void InternalClose(ShutdownEventArgs reason) { if (!SetCloseReason(reason)) { @@ -457,7 +441,7 @@ public void InternalClose(ShutdownEventArgs reason) TerminateMainloop(); } - public void LogCloseError(string error, Exception ex) + private void LogCloseError(string error, Exception ex) { ESLog.Error(error, ex); @@ -470,7 +454,7 @@ public void LogCloseError(string error, Exception ex) } } - public void MainLoop() + private void MainLoop() { bool shutdownCleanly = false; try @@ -530,7 +514,7 @@ public void MainLoop() FinishClose(); } - public void MainLoopIteration() + private void MainLoopIteration() { InboundFrame frame = _frameHandler.ReadFrame(); NotifyHeartbeatListener(); @@ -583,25 +567,25 @@ private void NotifyHeartbeatListener() _heartbeatDetected = true; } - public void NotifyReceivedCloseOk() + private void NotifyReceivedCloseOk() { TerminateMainloop(); _closed = true; } - public void OnCallbackException(CallbackExceptionEventArgs args) + internal void OnCallbackException(CallbackExceptionEventArgs args) { _callbackExceptionWrapper.Invoke(this, args); } ///Broadcasts notification of the final shutdown of the connection. - public void OnShutdown() + private void OnShutdown() { ThrowIfDisposed(); _connectionShutdownWrapper.Invoke(this, CloseReason); } - public void Open() + private void Open() { StartAndTune(); _model0.ConnectionOpen(_factory.VirtualHost); @@ -636,7 +620,7 @@ public void Open() /// to do to clean up and shut down the channel. /// /// - public void QuiesceChannel(SoftProtocolException pe) + private void QuiesceChannel(SoftProtocolException pe) { // Construct the QuiescingSession that we'll use during // the quiesce process. @@ -670,7 +654,7 @@ private bool SetCloseReason(ShutdownEventArgs reason) return System.Threading.Interlocked.CompareExchange(ref _closeReason, reason, null) is null; } - public void MaybeStartHeartbeatTimers() + private void MaybeStartHeartbeatTimers() { if (Heartbeat != TimeSpan.Zero) { @@ -679,12 +663,12 @@ public void MaybeStartHeartbeatTimers() } } - public void StartMainLoop() + private void StartMainLoop() { _mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning); } - public void HeartbeatReadTimerCallback(object state) + private void HeartbeatReadTimerCallback(object state) { if (_heartbeatReadTimer is null) { @@ -725,9 +709,9 @@ public void HeartbeatReadTimerCallback(object state) TerminateMainloop(); FinishClose(); } - else if (_heartbeatReadTimer != null) + else { - _heartbeatReadTimer.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite); + _heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite); } } catch (ObjectDisposedException) @@ -742,7 +726,7 @@ public void HeartbeatReadTimerCallback(object state) } } - public void HeartbeatWriteTimerCallback(object state) + private void HeartbeatWriteTimerCallback(object state) { if (_heartbeatWriteTimer is null) { @@ -779,7 +763,7 @@ private void MaybeStopHeartbeatTimers() /// /// May be called more than once. Should therefore be idempotent. /// - public void TerminateMainloop() + private void TerminateMainloop() { MaybeStopHeartbeatTimers(); _running = false; @@ -790,7 +774,7 @@ public override string ToString() return $"Connection({_id},{Endpoint})"; } - public void Write(ReadOnlyMemory memory) + internal void Write(ReadOnlyMemory memory) { _frameHandler.Write(memory); } @@ -810,13 +794,13 @@ public IModel CreateModel() { EnsureIsOpen(); ISession session = CreateSession(); - var model = (IFullModel)Protocol.CreateModel(session, ConsumerWorkService); + var model = (ModelBase)Protocol.CreateModel(session, ConsumerWorkService); model.ContinuationTimeout = _factory.ContinuationTimeout; model._Private_ChannelOpen(); return model; } - public void HandleConnectionBlocked(string reason) + internal void HandleConnectionBlocked(string reason) { if (!_connectionBlockedWrapper.IsEmpty) { @@ -824,7 +808,7 @@ public void HandleConnectionBlocked(string reason) } } - public void HandleConnectionUnblocked() + internal void HandleConnectionUnblocked() { if (!_connectionUnblockedWrapper.IsEmpty) { @@ -832,40 +816,29 @@ public void HandleConnectionUnblocked() } } - void IDisposable.Dispose() - { - Dispose(true); - } - - private void Dispose(bool disposing) + public void Dispose() { if (_disposed) { return; } - if (disposing) + try { - // dispose managed resources - try - { - this.Abort(); - _mainLoopTask.Wait(); - } - catch (OperationInterruptedException) - { - // ignored, see rabbitmq/rabbitmq-dotnet-client#133 - } - finally - { - _disposed = true; - } + this.Abort(); + _mainLoopTask.Wait(); + } + catch (OperationInterruptedException) + { + // ignored, see rabbitmq/rabbitmq-dotnet-client#133 + } + finally + { + _disposed = true; } - - // dispose unmanaged resources } - internal OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText) + private OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText) { Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request); return request; @@ -954,17 +927,14 @@ private void StartAndTune() "Possibly caused by authentication failure", e); } - ushort channelMax = (ushort)NegotiatedMaxValue(_factory.RequestedChannelMax, - connectionTune.m_channelMax); + ushort channelMax = (ushort)NegotiatedMaxValue(_factory.RequestedChannelMax, connectionTune.m_channelMax); _sessionManager = new SessionManager(this, channelMax); - uint frameMax = NegotiatedMaxValue(_factory.RequestedFrameMax, - connectionTune.m_frameMax); + uint frameMax = NegotiatedMaxValue(_factory.RequestedFrameMax, connectionTune.m_frameMax); FrameMax = frameMax; TimeSpan requestedHeartbeat = _factory.RequestedHeartbeat; - uint heartbeatInSeconds = NegotiatedMaxValue((uint)requestedHeartbeat.TotalSeconds, - (uint)connectionTune.m_heartbeatInSeconds); + uint heartbeatInSeconds = NegotiatedMaxValue((uint)requestedHeartbeat.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds); Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds); _model0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds); @@ -989,7 +959,7 @@ private static uint NegotiatedMaxValue(uint clientValue, uint serverValue) Math.Min(clientValue, serverValue); } - public void TakeOver(Connection other) + internal void TakeOver(Connection other) { _callbackExceptionWrapper.Takeover(other._callbackExceptionWrapper); _connectionBlockedWrapper.Takeover(other._connectionBlockedWrapper); diff --git a/projects/RabbitMQ.Client/client/events/RecoveryExceptionEventArgs.cs b/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs similarity index 68% rename from projects/RabbitMQ.Client/client/events/RecoveryExceptionEventArgs.cs rename to projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs index 7f7bf78beb..addc36e00c 100644 --- a/projects/RabbitMQ.Client/client/events/RecoveryExceptionEventArgs.cs +++ b/projects/RabbitMQ.Client/client/impl/ConnectionSecureOrTune.cs @@ -29,18 +29,25 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- -using System; - -namespace RabbitMQ.Client.Events +namespace RabbitMQ.Client.Impl { - /// - ///Describes an exception that was thrown during - ///automatic connection recovery performed by the library. - /// - public class RecoveryExceptionEventArgs : BaseExceptionEventArgs + ///Essential information from an incoming Connection.Tune + ///method. + internal struct ConnectionTuneDetails + { + ///The peer's suggested channel-max parameter. + public ushort m_channelMax; + + ///The peer's suggested frame-max parameter. + public uint m_frameMax; + + ///The peer's suggested heartbeat parameter. + public ushort m_heartbeatInSeconds; + } + + internal class ConnectionSecureOrTune { - public RecoveryExceptionEventArgs(Exception e) : base(e) - { - } + public byte[] m_challenge; + public ConnectionTuneDetails m_tuneDetails; } } diff --git a/projects/RabbitMQ.Client/client/impl/IFullModel.cs b/projects/RabbitMQ.Client/client/impl/IFullModel.cs deleted file mode 100644 index e732402d13..0000000000 --- a/projects/RabbitMQ.Client/client/impl/IFullModel.cs +++ /dev/null @@ -1,300 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; - -namespace RabbitMQ.Client.Impl -{ - ///Not part of the public API. Extension of IModel to - ///include utilities and connection-setup routines needed by the - ///implementation side. - /// - ///This interface is used by the API autogeneration - ///process. The AMQP XML specifications are read by the spec - ///compilation tool, and after the basic method interface and - ///implementation classes are generated, this interface is - ///scanned, and a spec-version-specific implementation is - ///autogenerated. Annotations are used on certain methods, return - ///types, and parameters, to customise the details of the - ///autogeneration process. - /// - /// - /// - internal interface IFullModel : IModel - { - ///Sends a Connection.TuneOk. Used during connection - ///initialisation. - void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat); - - ///Handle incoming Basic.Deliver methods. Dispatches - ///to waiting consumers. - void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] rentedArray); - - ///Handle incoming Basic.Ack methods. Signals a - ///BasicAckEvent. - void HandleBasicAck(ulong deliveryTag, bool multiple); - - void HandleBasicCancel(string consumerTag, bool nowait); - - ///Handle incoming Basic.CancelOk methods. - void HandleBasicCancelOk(string consumerTag); - - ///Handle incoming Basic.ConsumeOk methods. - void HandleBasicConsumeOk(string consumerTag); - - ///Handle incoming Basic.GetEmpty methods. Routes the - ///information to a waiting Basic.Get continuation. - /// - /// Note that the clusterId field is ignored, as in the - /// specification it notes that it is "deprecated pending - /// review". - /// - void HandleBasicGetEmpty(); - - ///Handle incoming Basic.GetOk methods. Routes the - ///information to a waiting Basic.Get continuation. - void HandleBasicGetOk(ulong deliveryTag, - bool redelivered, - string exchange, - string routingKey, - uint messageCount, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] rentedArray); - - ///Handle incoming Basic.Nack methods. Signals a - ///BasicNackEvent. - void HandleBasicNack(ulong deliveryTag, bool multiple, bool requeue); - - ///Handle incoming Basic.RecoverOk methods - ///received in reply to Basic.Recover. - /// - void HandleBasicRecoverOk(); - - ///Handle incoming Basic.Return methods. Signals a - ///BasicReturnEvent. - void HandleBasicReturn(ushort replyCode, - string replyText, - string exchange, - string routingKey, - IBasicProperties basicProperties, - ReadOnlyMemory body, - byte[] takeoverPayload); - - ///Handle an incoming Channel.Close. Shuts down the - ///session and model. - void HandleChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId); - - ///Handle an incoming Channel.CloseOk. - void HandleChannelCloseOk(); - - ///Handle incoming Channel.Flow methods. Either - ///stops or resumes sending the methods that have content. - void HandleChannelFlow(bool active); - - ///Handle an incoming Connection.Blocked. - void HandleConnectionBlocked(string reason); - - ///Handle an incoming Connection.Close. Shuts down the - ///connection and all sessions and models. - void HandleConnectionClose(ushort replyCode, string replyText, ushort classId, ushort methodId); - - /////////////////////////////////////////////////////////////////////////// - // Connection-related methods, for use in channel 0 during - // connection startup/shutdown. - - ///Handle incoming Connection.Secure - ///methods. - void HandleConnectionSecure(byte[] challenge); - - ///Handle an incoming Connection.Start. Used during - ///connection initialisation. - void HandleConnectionStart(byte versionMajor, byte versionMinor, IDictionary serverProperties, byte[] mechanisms, byte[] locales); - - ///Handle incoming Connection.Tune - ///methods. - void HandleConnectionTune(ushort channelMax, uint frameMax, ushort heartbeat); - - ///Handle an incominga Connection.Unblocked. - void HandleConnectionUnblocked(); - - ///Handle incoming Queue.DeclareOk methods. Routes the - ///information to a waiting Queue.DeclareOk continuation. - void HandleQueueDeclareOk(string queue, uint messageCount, uint consumerCount); - - ///Used to send a Basic.Cancel method. The public - ///consume API calls this while also managing internal - ///datastructures. - void _Private_BasicCancel(string consumerTag, bool nowait); - - ///Used to send a Basic.Consume method. The public - ///consume API calls this while also managing internal - ///datastructures. - void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary arguments); - - ///Used to send a Basic.Get. Basic.Get is a special - ///case, since it can result in a Basic.GetOk or a - ///Basic.GetEmpty, so this level of manual control is - ///required. - void _Private_BasicGet(string queue, bool autoAck); - - ///Used to send a Basic.Publish method. Called by the - ///public publish method after potential null-reference issues - ///have been rectified. - void _Private_BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body); - - void _Private_BasicRecover(bool requeue); - - ///Used to send a Channel.Close. Called during - ///session shutdown. - void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId); - - ///Used to send a Channel.CloseOk. Called during - ///session shutdown. - void _Private_ChannelCloseOk(); - - ///Used to send a Channel.FlowOk. Confirms that - ///Channel.Flow from the broker was processed. - void _Private_ChannelFlowOk(bool active); - - ///Used to send a Channel.Open. Called during session - ///initialisation. - void _Private_ChannelOpen(); - - ///Used to send a Confirm.Select method. The public - ///confirm API calls this while also managing internal - ///datastructures. - void _Private_ConfirmSelect(bool nowait); - - ///Used to send a Connection.Close. Called during - ///connection shutdown. - void _Private_ConnectionClose(ushort replyCode, string replyText, ushort classId, ushort methodId); - - ///Used to send a Connection.CloseOk. Called during - ///connection shutdown. - void _Private_ConnectionCloseOk(); - - ///Used to send a Connection.Open. Called during - ///connection startup. - void _Private_ConnectionOpen(string virtualHost); - - ///Used to send a Connection.SecureOk. Again, this is - ///special, like Basic.Get. - void _Private_ConnectionSecureOk(byte[] response); - - ///Used to send a Connection.StartOk. This is - ///special, like Basic.Get. - void _Private_ConnectionStartOk(IDictionary clientProperties, string mechanism, byte[] response, string locale); - - ///Used to send a Conection.UpdateSecret method. Called by the - ///public UpdateSecret method. - /// - void _Private_UpdateSecret(byte[] newSecret, string reason); - - ///Used to send a Exchange.Bind method. Called by the - ///public bind method. - /// - void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary arguments); - - ///Used to send a Exchange.Declare method. Called by the - ///public declare method. - /// - void _Private_ExchangeDeclare(string exchange, - string type, - bool passive, - bool durable, - bool autoDelete, - bool @internal, - bool nowait, - IDictionary arguments); - - ///Used to send a Exchange.Delete method. Called by the - ///public delete method. - /// - void _Private_ExchangeDelete(string exchange, bool ifUnused, bool nowait); - - ///Used to send a Exchange.Unbind method. Called by the - ///public unbind method. - /// - void _Private_ExchangeUnbind(string destination, string source, string routingKey, bool nowait, IDictionary arguments); - - ///Used to send a Queue.Bind method. Called by the - ///public bind method. - void _Private_QueueBind(string queue, string exchange, string routingKey, bool nowait, IDictionary arguments); - - ///Used to send a Queue.Declare method. Called by the - ///public declare method. - void _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments); - - ///Used to send a Queue.Delete method. Called by the - ///public delete method. - uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait); - - ///Used to send a Queue.Purge method. Called by the - ///public purge method. - uint _Private_QueuePurge(string queue, bool nowait); - } - - ///Essential information from an incoming Connection.Tune - ///method. - internal struct ConnectionTuneDetails - { - ///The peer's suggested channel-max parameter. - public ushort m_channelMax; - - ///The peer's suggested frame-max parameter. - public uint m_frameMax; - - ///The peer's suggested heartbeat parameter. - public ushort m_heartbeatInSeconds; - } - - - internal class ConnectionSecureOrTune - { - public byte[] m_challenge; - public ConnectionTuneDetails m_tuneDetails; - } -} diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index c7a61d9b21..a91bd7874f 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -44,11 +44,11 @@ namespace RabbitMQ.Client.Impl { - internal abstract class ModelBase : IFullModel, IRecoverable + internal abstract class ModelBase : IModel, IRecoverable { ///Only used to kick-start a connection open ///sequence. See - public BlockingCell m_connectionStartCell; + internal BlockingCell m_connectionStartCell; internal readonly IBasicProperties _emptyBasicProperties; private readonly Dictionary _consumers = new Dictionary(); @@ -64,12 +64,12 @@ internal abstract class ModelBase : IFullModel, IRecoverable private ShutdownEventArgs _closeReason; public ShutdownEventArgs CloseReason => Volatile.Read(ref _closeReason); - public IConsumerDispatcher ConsumerDispatcher { get; } + internal IConsumerDispatcher ConsumerDispatcher { get; } - public ModelBase(ISession session) : this(session, session.Connection.ConsumerWorkService) + protected ModelBase(ISession session) : this(session, session.Connection.ConsumerWorkService) { } - public ModelBase(ISession session, ConsumerWorkService workService) + protected ModelBase(ISession session, ConsumerWorkService workService) { if (workService is AsyncConsumerWorkService asyncConsumerWorkService) { @@ -101,7 +101,7 @@ protected void Initialise(ISession session) Session.SessionShutdown += OnSessionShutdown; } - public TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); + internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan ContinuationTimeout { get; set; } = TimeSpan.FromSeconds(20); public event EventHandler BasicAcks @@ -247,7 +247,7 @@ private async Task CloseAsync(ShutdownEventArgs reason, bool abort) } } - public void ConnectionOpen(string virtualHost) + internal void ConnectionOpen(string virtualHost) { var k = new SimpleBlockingRpcContinuation(); lock (_rpcLock) @@ -267,7 +267,7 @@ public void ConnectionOpen(string virtualHost) } } - public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) + internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response) { var k = new ConnectionStartRpcContinuation(); lock (_rpcLock) @@ -288,10 +288,7 @@ public ConnectionSecureOrTune ConnectionSecureOk(byte[] response) return k.m_result; } - public ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale) + internal ConnectionSecureOrTune ConnectionStartOk(IDictionary clientProperties, string mechanism, byte[] response, string locale) { var k = new ConnectionStartRpcContinuation(); lock (_rpcLock) @@ -313,9 +310,9 @@ public ConnectionSecureOrTune ConnectionStartOk(IDictionary clie return k.m_result; } - public abstract bool DispatchAsynchronous(in IncomingCommand cmd); + protected abstract bool DispatchAsynchronous(in IncomingCommand cmd); - public void Enqueue(IRpcContinuation k) + protected void Enqueue(IRpcContinuation k) { if (CloseReason is null) { @@ -327,7 +324,7 @@ public void Enqueue(IRpcContinuation k) } } - public void FinishClose() + internal void FinishClose() { var reason = CloseReason; if (reason != null) @@ -338,7 +335,7 @@ public void FinishClose() m_connectionStartCell?.ContinueWithValue(null); } - public void HandleCommand(in IncomingCommand cmd) + private void HandleCommand(in IncomingCommand cmd) { if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further. { @@ -346,7 +343,7 @@ public void HandleCommand(in IncomingCommand cmd) } } - public T ModelRpc(MethodBase method) where T : MethodBase + protected T ModelRpc(MethodBase method) where T : MethodBase { var k = new SimpleBlockingRpcContinuation(); var outgoingCommand = new OutgoingCommand(method); @@ -365,12 +362,12 @@ public T ModelRpc(MethodBase method) where T : MethodBase throw new UnexpectedMethodException(baseResult.ProtocolClassId, baseResult.ProtocolMethodId, baseResult.ProtocolMethodName); } - public void ModelSend(MethodBase method) + protected void ModelSend(MethodBase method) { ModelSend(method, null, ReadOnlyMemory.Empty); } - public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory body) + protected void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory body) { if (method.HasContent) { @@ -383,7 +380,7 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemor } } - public void OnCallbackException(CallbackExceptionEventArgs args) + internal void OnCallbackException(CallbackExceptionEventArgs args) { _callbackExceptionWrapper.Invoke(this, args); } @@ -400,7 +397,7 @@ public void OnCallbackException(CallbackExceptionEventArgs args) ///shutdown event. See the definition of Enqueue() above. /// /// - public virtual void OnModelShutdown(ShutdownEventArgs reason) + private void OnModelShutdown(ShutdownEventArgs reason) { _continuationQueue.HandleModelShutdown(reason); _modelShutdownWrapper.Invoke(this, reason); @@ -419,7 +416,7 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason) _flowControlBlock.Set(); } - public void OnSessionShutdown(object sender, ShutdownEventArgs reason) + private void OnSessionShutdown(object sender, ShutdownEventArgs reason) { ConsumerDispatcher.Quiesce(); SetCloseReason(reason); @@ -442,11 +439,9 @@ internal bool SetCloseReason(ShutdownEventArgs reason) } public override string ToString() - { - return Session.ToString(); - } + => Session.ToString(); - public void TransmitAndEnqueue(in OutgoingCommand cmd, IRpcContinuation k) + private void TransmitAndEnqueue(in OutgoingCommand cmd, IRpcContinuation k) { Enqueue(k); Session.Transmit(cmd); @@ -468,9 +463,7 @@ protected virtual void Dispose(bool disposing) // dispose unmanaged resources } - public abstract void ConnectionTuneOk(ushort channelMax, - uint frameMax, - ushort heartbeat); + public abstract void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat); public void HandleBasicAck(ulong deliveryTag, bool multiple) { @@ -745,24 +738,17 @@ public void HandleChannelFlow(bool active) } } - public void HandleConnectionBlocked(string reason) + internal void HandleConnectionBlocked(string reason) { Session.Connection.HandleConnectionBlocked(reason); } - public void HandleConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId) + public void HandleConnectionClose(ushort replyCode, string replyText, ushort classId, ushort methodId) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, - replyCode, - replyText, - classId, - methodId); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, replyCode, replyText, classId, methodId); try { - ((Connection)Session.Connection).InternalClose(reason); + Session.Connection.InternalClose(reason); _Private_ConnectionCloseOk(); SetCloseReason(Session.Connection.CloseReason); } @@ -788,11 +774,7 @@ public void HandleConnectionSecure(byte[] challenge) k.HandleCommand(IncomingCommand.Empty); // release the continuation. } - public void HandleConnectionStart(byte versionMajor, - byte versionMinor, - IDictionary serverProperties, - byte[] mechanisms, - byte[] locales) + public void HandleConnectionStart(byte versionMajor, byte versionMinor, IDictionary serverProperties, byte[] mechanisms, byte[] locales) { if (m_connectionStartCell is null) { @@ -830,52 +812,29 @@ public void HandleConnectionTune(ushort channelMax, uint frameMax, ushort heartb public void HandleConnectionUnblocked() { - var cb = (Connection)Session.Connection; - - cb.HandleConnectionUnblocked(); + Session.Connection.HandleConnectionUnblocked(); } - public void HandleQueueDeclareOk(string queue, - uint messageCount, - uint consumerCount) + public void HandleQueueDeclareOk(string queue, uint messageCount, uint consumerCount) { var k = (QueueDeclareRpcContinuation)_continuationQueue.Next(); k.m_result = new QueueDeclareOk(queue, messageCount, consumerCount); k.HandleCommand(IncomingCommand.Empty); // release the continuation. } - public abstract void _Private_BasicCancel(string consumerTag, - bool nowait); + public abstract void _Private_BasicCancel(string consumerTag, bool nowait); - public abstract void _Private_BasicConsume(string queue, - string consumerTag, - bool noLocal, - bool autoAck, - bool exclusive, - bool nowait, - IDictionary arguments); + public abstract void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary arguments); - public abstract void _Private_BasicGet(string queue, - bool autoAck); + public abstract void _Private_BasicGet(string queue, bool autoAck); - public abstract void _Private_BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body); + public abstract void _Private_BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body); - public abstract void _Private_BasicPublishMemory(ReadOnlyMemory exchange, - ReadOnlyMemory routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body); + public abstract void _Private_BasicPublishMemory(ReadOnlyMemory exchange, ReadOnlyMemory routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body); public abstract void _Private_BasicRecover(bool requeue); - public abstract void _Private_ChannelClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); + public abstract void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId); public abstract void _Private_ChannelCloseOk(); @@ -885,72 +844,31 @@ public abstract void _Private_ChannelClose(ushort replyCode, public abstract void _Private_ConfirmSelect(bool nowait); - public abstract void _Private_ConnectionClose(ushort replyCode, - string replyText, - ushort classId, - ushort methodId); - public abstract void _Private_ConnectionCloseOk(); public abstract void _Private_ConnectionOpen(string virtualHost); public abstract void _Private_ConnectionSecureOk(byte[] response); - public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, - string mechanism, - byte[] response, - string locale); + public abstract void _Private_ConnectionStartOk(IDictionary clientProperties, string mechanism, byte[] response, string locale); - public abstract void _Private_UpdateSecret( - byte[] @newSecret, - string @reason); + public abstract void _Private_UpdateSecret(byte[] @newSecret, string @reason); - public abstract void _Private_ExchangeBind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDeclare(string exchange, - string type, - bool passive, - bool durable, - bool autoDelete, - bool @internal, - bool nowait, - IDictionary arguments); - - public abstract void _Private_ExchangeDelete(string exchange, - bool ifUnused, - bool nowait); - - public abstract void _Private_ExchangeUnbind(string destination, - string source, - string routingKey, - bool nowait, - IDictionary arguments); + public abstract void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary arguments); - public abstract void _Private_QueueBind(string queue, - string exchange, - string routingKey, - bool nowait, - IDictionary arguments); + public abstract void _Private_ExchangeDeclare(string exchange, string type, bool passive, bool durable, bool autoDelete, bool @internal, bool nowait, IDictionary arguments); + + public abstract void _Private_ExchangeDelete(string exchange, bool ifUnused, bool nowait); + + public abstract void _Private_ExchangeUnbind(string destination, string source, string routingKey, bool nowait, IDictionary arguments); - public abstract void _Private_QueueDeclare(string queue, - bool passive, - bool durable, - bool exclusive, - bool autoDelete, - bool nowait, - IDictionary arguments); + public abstract void _Private_QueueBind(string queue, string exchange, string routingKey, bool nowait, IDictionary arguments); - public abstract uint _Private_QueueDelete(string queue, - bool ifUnused, - bool ifEmpty, - bool nowait); + public abstract void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary arguments); - public abstract uint _Private_QueuePurge(string queue, - bool nowait); + public abstract uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait); + + public abstract uint _Private_QueuePurge(string queue, bool nowait); public abstract void BasicAck(ulong deliveryTag, bool multiple); @@ -981,13 +899,7 @@ public void BasicCancelNoWait(string consumerTag) } } - public string BasicConsume(string queue, - bool autoAck, - string consumerTag, - bool noLocal, - bool exclusive, - IDictionary arguments, - IBasicConsumer consumer) + public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary arguments, IBasicConsumer consumer) { // TODO: Replace with flag if (ConsumerDispatcher is AsyncConsumerDispatcher) @@ -1028,9 +940,7 @@ public BasicGetResult BasicGet(string queue, bool autoAck) return k.m_result; } - public abstract void BasicNack(ulong deliveryTag, - bool multiple, - bool requeue); + public abstract void BasicNack(ulong deliveryTag, bool multiple, bool requeue); private void AllocatePublishSeqNos(int count) { @@ -1043,11 +953,7 @@ private void AllocatePublishSeqNos(int count) } } - public void BasicPublish(string exchange, - string routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body) + public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { if (routingKey is null) { @@ -1069,11 +975,7 @@ public void BasicPublish(string exchange, body); } - public void BasicPublish(CachedString exchange, - CachedString routingKey, - bool mandatory, - IBasicProperties basicProperties, - ReadOnlyMemory body) + public void BasicPublish(CachedString exchange, CachedString routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { if (NextPublishSeqNo > 0) { @@ -1101,9 +1003,7 @@ public void UpdateSecret(string newSecret, string reason) _Private_UpdateSecret(Encoding.UTF8.GetBytes(newSecret), reason); } - public abstract void BasicQos(uint prefetchSize, - ushort prefetchCount, - bool global); + public abstract void BasicQos(uint prefetchSize, ushort prefetchCount, bool global); public void BasicRecover(bool requeue) { @@ -1119,8 +1019,7 @@ public void BasicRecover(bool requeue) public abstract void BasicRecoverAsync(bool requeue); - public abstract void BasicReject(ulong deliveryTag, - bool requeue); + public abstract void BasicReject(ulong deliveryTag, bool requeue); public void ConfirmSelect() { @@ -1147,18 +1046,12 @@ public IBasicPublishBatch CreateBasicPublishBatch(int sizeHint) } - public void ExchangeBind(string destination, - string source, - string routingKey, - IDictionary arguments) + public void ExchangeBind(string destination, string source, string routingKey, IDictionary arguments) { _Private_ExchangeBind(destination, source, routingKey, false, arguments); } - public void ExchangeBindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) + public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary arguments) { _Private_ExchangeBind(destination, source, routingKey, true, arguments); } @@ -1168,11 +1061,7 @@ public void ExchangeDeclare(string exchange, string type, bool durable, bool aut _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, false, arguments); } - public void ExchangeDeclareNoWait(string exchange, - string type, - bool durable, - bool autoDelete, - IDictionary arguments) + public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments) { _Private_ExchangeDeclare(exchange, type, false, durable, autoDelete, false, true, arguments); } @@ -1182,59 +1071,42 @@ public void ExchangeDeclarePassive(string exchange) _Private_ExchangeDeclare(exchange, "", true, false, false, false, false, null); } - public void ExchangeDelete(string exchange, - bool ifUnused) + public void ExchangeDelete(string exchange, bool ifUnused) { _Private_ExchangeDelete(exchange, ifUnused, false); } - public void ExchangeDeleteNoWait(string exchange, - bool ifUnused) + public void ExchangeDeleteNoWait(string exchange, bool ifUnused) { _Private_ExchangeDelete(exchange, ifUnused, true); } - public void ExchangeUnbind(string destination, - string source, - string routingKey, - IDictionary arguments) + public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary arguments) { _Private_ExchangeUnbind(destination, source, routingKey, false, arguments); } - public void ExchangeUnbindNoWait(string destination, - string source, - string routingKey, - IDictionary arguments) + public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary arguments) { _Private_ExchangeUnbind(destination, source, routingKey, true, arguments); } - public void QueueBind(string queue, - string exchange, - string routingKey, - IDictionary arguments) + public void QueueBind(string queue, string exchange, string routingKey, IDictionary arguments) { _Private_QueueBind(queue, exchange, routingKey, false, arguments); } - public void QueueBindNoWait(string queue, - string exchange, - string routingKey, - IDictionary arguments) + public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary arguments) { _Private_QueueBind(queue, exchange, routingKey, true, arguments); } - public QueueDeclareOk QueueDeclare(string queue, bool durable, - bool exclusive, bool autoDelete, - IDictionary arguments) + public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments); } - public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) + public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { _Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments); } @@ -1256,16 +1128,12 @@ public uint ConsumerCount(string queue) return ok.ConsumerCount; } - public uint QueueDelete(string queue, - bool ifUnused, - bool ifEmpty) + public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty) { return _Private_QueueDelete(queue, ifUnused, ifEmpty, false); } - public void QueueDeleteNoWait(string queue, - bool ifUnused, - bool ifEmpty) + public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty) { _Private_QueueDelete(queue, ifUnused, ifEmpty, true); } @@ -1275,10 +1143,7 @@ public uint QueuePurge(string queue) return _Private_QueuePurge(queue, false); } - public abstract void QueueUnbind(string queue, - string exchange, - string routingKey, - IDictionary arguments); + public abstract void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments); public abstract void TxCommit(); @@ -1379,8 +1244,7 @@ internal void SendCommands(IList commands) Session.Transmit(commands); } - private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, - bool autoDelete, IDictionary arguments) + private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary arguments) { var k = new QueueDeclareRpcContinuation(); lock (_rpcLock) diff --git a/projects/RabbitMQ.Client/client/impl/RecordedEntity.cs b/projects/RabbitMQ.Client/client/impl/RecordedEntity.cs index 341d92f87a..b2b1aaaaec 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedEntity.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedEntity.cs @@ -43,7 +43,7 @@ protected RecordedEntity(AutorecoveringModel channel) protected IModel ModelDelegate { - get { return _channel.Delegate; } + get { return _channel.InnerChannel; } } public abstract void Recover(); diff --git a/projects/RabbitMQ.Client/client/impl/RecoveryAwareModel.cs b/projects/RabbitMQ.Client/client/impl/RecoveryAwareModel.cs index c5acdab73e..013fd48bf5 100644 --- a/projects/RabbitMQ.Client/client/impl/RecoveryAwareModel.cs +++ b/projects/RabbitMQ.Client/client/impl/RecoveryAwareModel.cs @@ -34,7 +34,7 @@ namespace RabbitMQ.Client.Impl { - internal class RecoveryAwareModel : Model, IFullModel, IRecoverable + internal sealed class RecoveryAwareModel : Model { public RecoveryAwareModel(ISession session) : base(session) { @@ -95,8 +95,7 @@ public override void HandleBasicDeliver(string consumerTag, rentedArray); } - public override void BasicAck(ulong deliveryTag, - bool multiple) + public override void BasicAck(ulong deliveryTag, bool multiple) { ulong realTag = deliveryTag - ActiveDeliveryTagOffset; if (realTag > 0 && realTag <= deliveryTag) @@ -105,9 +104,7 @@ public override void BasicAck(ulong deliveryTag, } } - public override void BasicNack(ulong deliveryTag, - bool multiple, - bool requeue) + public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue) { ulong realTag = deliveryTag - ActiveDeliveryTagOffset; if (realTag > 0 && realTag <= deliveryTag) @@ -116,8 +113,7 @@ public override void BasicNack(ulong deliveryTag, } } - public override void BasicReject(ulong deliveryTag, - bool requeue) + public override void BasicReject(ulong deliveryTag, bool requeue) { ulong realTag = deliveryTag - ActiveDeliveryTagOffset; if (realTag > 0 && realTag <= deliveryTag) @@ -126,7 +122,7 @@ public override void BasicReject(ulong deliveryTag, } } - protected ulong OffsetDeliveryTag(ulong deliveryTag) + private ulong OffsetDeliveryTag(ulong deliveryTag) { return deliveryTag + ActiveDeliveryTagOffset; } diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs index 23e2ee1e1d..6923f87248 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs @@ -70,26 +70,7 @@ public ISession Create() { throw new ChannelAllocationException(); } - return CreateInternal(channelNumber); - } - } - public ISession Create(int channelNumber) - { - lock (_sessionMap) - { - if (!_ints.Reserve(channelNumber)) - { - throw new ChannelAllocationException(channelNumber); - } - return CreateInternal(channelNumber); - } - } - - public ISession CreateInternal(int channelNumber) - { - lock (_sessionMap) - { ISession session = new Session(_connection, (ushort)channelNumber); session.SessionShutdown += HandleSessionShutdown; _sessionMap[channelNumber] = session; @@ -97,7 +78,7 @@ public ISession CreateInternal(int channelNumber) } } - public void HandleSessionShutdown(object sender, ShutdownEventArgs reason) + private void HandleSessionShutdown(object sender, ShutdownEventArgs reason) { lock (_sessionMap) { diff --git a/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs b/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs index ee77c878f1..443222789a 100644 --- a/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs +++ b/projects/RabbitMQ.Client/client/impl/ShutdownContinuation.cs @@ -59,11 +59,6 @@ public virtual void OnConnectionShutdown(object sender, ShutdownEventArgs reason m_cell.ContinueWithValue(reason); } - public virtual void OnModelShutdown(IModel sender, ShutdownEventArgs reason) - { - m_cell.ContinueWithValue(reason); - } - public virtual ShutdownEventArgs Wait() { return m_cell.WaitForValue(); diff --git a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs b/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs index 5c45a8948b..f6e7bbe84b 100644 --- a/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs +++ b/projects/RabbitMQ.Client/client/impl/SimpleBlockingRpcContinuation.cs @@ -39,20 +39,6 @@ internal class SimpleBlockingRpcContinuation : IRpcContinuation { public readonly BlockingCell> m_cell = new BlockingCell>(); - public virtual IncomingCommand GetReply() - { - Either result = m_cell.WaitForValue(); - switch (result.Alternative) - { - case EitherAlternative.Left: - return result.LeftValue; - case EitherAlternative.Right: - throw new OperationInterruptedException(result.RightValue); - default: - return default; - } - } - public virtual IncomingCommand GetReply(TimeSpan timeout) { Either result = m_cell.WaitForValue(timeout); diff --git a/projects/RabbitMQ.Client/client/logging/RabbitMqConsoleEventListener.cs b/projects/RabbitMQ.Client/client/logging/RabbitMqConsoleEventListener.cs deleted file mode 100644 index 9f74fa5f85..0000000000 --- a/projects/RabbitMQ.Client/client/logging/RabbitMqConsoleEventListener.cs +++ /dev/null @@ -1,66 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Diagnostics.Tracing; - -namespace RabbitMQ.Client.Logging -{ - public sealed class RabbitMqConsoleEventListener : EventListener, IDisposable - { - public RabbitMqConsoleEventListener() - { - EnableEvents(RabbitMqClientEventSource.Log, EventLevel.Informational, RabbitMqClientEventSource.Keywords.Log); - } - - protected override void OnEventWritten(EventWrittenEventArgs eventData) - { - foreach(object pl in eventData.Payload) - { - if (pl is IDictionary dict) - { - var rex = new RabbitMqExceptionDetail(dict); - Console.WriteLine("{0}: {1}", eventData.Level, rex.ToString()); - } - else - { - Console.WriteLine("{0}: {1}", eventData.Level, pl.ToString()); - } - } - } - - public override void Dispose() - { - DisableEvents(RabbitMqClientEventSource.Log); - } - } -} diff --git a/projects/RabbitMQ.Client/util/IntAllocator.cs b/projects/RabbitMQ.Client/util/IntAllocator.cs index f5c1367611..707838f6d8 100644 --- a/projects/RabbitMQ.Client/util/IntAllocator.cs +++ b/projects/RabbitMQ.Client/util/IntAllocator.cs @@ -112,51 +112,6 @@ public void Free(int id) _unsorted[_unsortedCount++] = id; } - public bool Reserve(int id) - { - // We always flush before reserving because the only way to determine - // if an ID is in the unsorted array is through a linear scan. This leads - // us to the potentially expensive situation where there is a large unsorted - // array and we reserve several IDs, incurring the cost of the scan each time. - // Flushing makes sure the array is always empty and does no additional work if - // reserve is called twice. - Flush(); - - IntervalList current = _base; - - while (current != null) - { - if (current.End < id) - { - current = current.Next; - continue; - } - else if (current.Start > id) - { - return false; - } - else if (current.End == id) - { - current.End--; - } - else if (current.Start == id) - { - current.Start++; - } - else - { - // The ID is in the middle of this interval. - // We need to split the interval into two. - var rest = new IntervalList(id + 1, current.End); - current.End = id - 1; - rest.Next = current.Next; - current.Next = rest; - } - return true; - } - return false; - } - private void Flush() { if (_unsortedCount > 0) diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index a732329e4f..92f14e6631 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -24,7 +24,7 @@ namespace RabbitMQ.Client public static RabbitMQ.Client.AmqpTcpEndpoint Parse(string address) { } public static RabbitMQ.Client.AmqpTcpEndpoint[] ParseMultiple(string addresses) { } } - public struct AmqpTimestamp : System.IEquatable + public readonly struct AmqpTimestamp : System.IEquatable { public AmqpTimestamp(long unixTime) { } public long UnixTime { get; } @@ -313,7 +313,6 @@ namespace RabbitMQ.Client uint FrameMax { get; } System.TimeSpan Heartbeat { get; } bool IsOpen { get; } - RabbitMQ.Client.AmqpTcpEndpoint[] KnownHosts { get; } RabbitMQ.Client.IProtocol Protocol { get; } System.Collections.Generic.IDictionary ServerProperties { get; } System.Collections.Generic.IList ShutdownReport { get; } @@ -327,8 +326,6 @@ namespace RabbitMQ.Client event System.EventHandler RecoverySucceeded; void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort); RabbitMQ.Client.IModel CreateModel(); - void HandleConnectionBlocked(string reason); - void HandleConnectionUnblocked(); void UpdateSecret(string newSecret, string reason); } public static class IConnectionExtensions @@ -688,10 +685,6 @@ namespace RabbitMQ.Client.Events public string NameAfter { get; } public string NameBefore { get; } } - public class RecoveryExceptionEventArgs : RabbitMQ.Client.Events.BaseExceptionEventArgs - { - public RecoveryExceptionEventArgs(System.Exception e) { } - } } namespace RabbitMQ.Client.Exceptions { @@ -848,12 +841,6 @@ namespace RabbitMQ.Client.Logging public Keywords() { } } } - public sealed class RabbitMqConsoleEventListener : System.Diagnostics.Tracing.EventListener, System.IDisposable - { - public RabbitMqConsoleEventListener() { } - public override void Dispose() { } - protected override void OnEventWritten(System.Diagnostics.Tracing.EventWrittenEventArgs eventData) { } - } [System.Diagnostics.Tracing.EventData] public class RabbitMqExceptionDetail { diff --git a/projects/RabbitMQ.Client/util/DebugUtil.cs b/projects/Unit/Helper/DebugUtil.cs similarity index 99% rename from projects/RabbitMQ.Client/util/DebugUtil.cs rename to projects/Unit/Helper/DebugUtil.cs index 125e0f2684..9ee8ed76bd 100644 --- a/projects/RabbitMQ.Client/util/DebugUtil.cs +++ b/projects/Unit/Helper/DebugUtil.cs @@ -34,7 +34,7 @@ using System.IO; using System.Reflection; -namespace RabbitMQ.Util +namespace RabbitMQ.Client.Unit { ///Miscellaneous debugging and development utilities. /// diff --git a/projects/Unit/TestChannelAllocation.cs b/projects/Unit/TestChannelAllocation.cs index 9ea1004802..9764ca2eb4 100644 --- a/projects/Unit/TestChannelAllocation.cs +++ b/projects/Unit/TestChannelAllocation.cs @@ -47,7 +47,7 @@ public class TestIModelAllocation public int ModelNumber(IModel model) { - return ((ModelBase)((AutorecoveringModel)model).Delegate).Session.ChannelNumber; + return ((AutorecoveringModel)model).ChannelNumber; } [SetUp] public void Connect() diff --git a/projects/Unit/TestContentHeaderCodec.cs b/projects/Unit/TestContentHeaderCodec.cs index 152f94a151..65249fddc0 100644 --- a/projects/Unit/TestContentHeaderCodec.cs +++ b/projects/Unit/TestContentHeaderCodec.cs @@ -33,8 +33,6 @@ using System.Collections.Generic; using NUnit.Framework; -using RabbitMQ.Util; - namespace RabbitMQ.Client.Unit { [TestFixture] diff --git a/projects/Unit/TestMethodArgumentCodec.cs b/projects/Unit/TestMethodArgumentCodec.cs index 6ed3c40227..a042d7d83b 100644 --- a/projects/Unit/TestMethodArgumentCodec.cs +++ b/projects/Unit/TestMethodArgumentCodec.cs @@ -36,7 +36,6 @@ using NUnit.Framework; using RabbitMQ.Client.Impl; -using RabbitMQ.Util; namespace RabbitMQ.Client.Unit { diff --git a/projects/Unit/TestPublisherConfirms.cs b/projects/Unit/TestPublisherConfirms.cs index 30e240c1ad..e096b08dde 100644 --- a/projects/Unit/TestPublisherConfirms.cs +++ b/projects/Unit/TestPublisherConfirms.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Reflection; using System.Threading; using System.Threading.Tasks; using NUnit.Framework; @@ -77,8 +78,11 @@ public void TestWaitForConfirmsWithTimeout_MessageNacked_WaitingHasTimedout_Retu { TestWaitForConfirms(2000, (ch) => { - var fullModel = ch as IFullModel; - fullModel.HandleBasicNack(10, false, false); + IModel actualModel = ((AutorecoveringModel)ch).InnerChannel; + actualModel + .GetType() + .GetMethod("HandleAckNack", BindingFlags.Instance | BindingFlags.NonPublic) + .Invoke(actualModel, new object[] { 10UL, false, true }); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)); Assert.IsFalse(ch.WaitForConfirmsAsync(cts.Token).GetAwaiter().GetResult()); diff --git a/projects/Unit/WireFormattingFixture.cs b/projects/Unit/WireFormattingFixture.cs index dda5facae0..c290b7c2d3 100644 --- a/projects/Unit/WireFormattingFixture.cs +++ b/projects/Unit/WireFormattingFixture.cs @@ -30,12 +30,9 @@ //--------------------------------------------------------------------------- using System; -using System.IO; using NUnit.Framework; -using RabbitMQ.Util; - namespace RabbitMQ.Client.Unit { class WireFormattingFixture