diff --git a/.gitignore b/.gitignore index ff820f3350..ec34c77b6b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.orig *.log _site/ @@ -28,6 +29,11 @@ test.sh test-output.log InternalTrace* nunit-agent* +################# +## JetBrains Rider +################# +.idea/ + ################# ## Visual Studio ################# diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index efdd0fcf97..8c37f5f06c 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -5,6 +5,7 @@ MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}" ProjectSection(SolutionItems) = preProject .editorconfig = .editorconfig + .gitignore = .gitignore EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}" diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 3cda2e436f..d470b54eb7 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -481,8 +481,8 @@ RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler RabbitMQ.Client.IChannel.ChannelNumber.get -> int @@ -658,6 +658,7 @@ RabbitMQ.Client.PublicationAddress RabbitMQ.Client.PublicationAddress.PublicationAddress(string exchangeType, string exchangeName, string routingKey) -> void RabbitMQ.Client.QueueDeclareOk RabbitMQ.Client.QueueDeclareOk.QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) -> void +RabbitMQ.Client.RabbitMQActivitySource RabbitMQ.Client.ReadOnlyBasicProperties RabbitMQ.Client.ReadOnlyBasicProperties.AppId.get -> string RabbitMQ.Client.ReadOnlyBasicProperties.ClusterId.get -> string @@ -851,6 +852,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Cli static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string +static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.get -> bool +static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.set -> void static RabbitMQ.Client.TcpClientAdapter.GetMatchingHost(System.Collections.Generic.IReadOnlyCollection addresses, System.Net.Sockets.AddressFamily addressFamily) -> System.Net.IPAddress static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource.Log.get -> RabbitMQ.Client.TimerBasedCredentialRefresherEventSource static readonly RabbitMQ.Client.CachedString.Empty -> RabbitMQ.Client.CachedString @@ -881,6 +884,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.Dispose(bool disposing) -> void virtual RabbitMQ.Client.TcpClientAdapter.GetStream() -> System.Net.Sockets.NetworkStream virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void +~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string +~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string ~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index e06dd89034..a539ea28fb 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -80,4 +80,7 @@ + + + diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 5a98e65837..e69dc6079c 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -192,7 +192,7 @@ public interface IChannel : IDisposable /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -203,7 +203,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; #nullable disable diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 862035fa9b..f48be542fa 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -89,14 +89,14 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory body) where T : IReadOnlyBasicProperties, IAmqpHeader { - return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, in basicProperties, body); + return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); } public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); #nullable disable diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 7264f7e507..59520ef124 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.Impl; @@ -78,8 +79,9 @@ await base.HandleBasicConsumeOk(consumerTag) public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { + var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); // No need to call base, it's empty. - return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); + return BasicDeliverWrapper(deliverEventArgs); } ///Fires the Shutdown event. @@ -93,5 +95,13 @@ await _shutdownWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); } } + + private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs) + { + using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs)) + { + await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false); + } + } } } diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs index d69d72e23a..153ff7bb8b 100644 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Diagnostics; using System.Threading.Tasks; namespace RabbitMQ.Client.Events @@ -88,10 +89,12 @@ public override void HandleBasicConsumeOk(string consumerTag) public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, ReadOnlyBasicProperties properties, ReadOnlyMemory body) { - await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - Received?.Invoke( - this, - new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); + BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default) + { + await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + Received?.Invoke(this, eventArgs); + } } ///Fires the Shutdown event. diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index f673e00091..b9e2d0274d 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -72,6 +72,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout) _tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false); } + internal DateTime StartTime { get; } = DateTime.UtcNow; + public ConfiguredTaskAwaitable.ConfiguredTaskAwaiter GetAwaiter() { return _tcsConfiguredTaskAwaitable.GetAwaiter(); diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 3f6cdba60f..7832dfdde4 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -290,13 +290,13 @@ public ValueTask BasicGetAsync(string queue, bool autoAck) public ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) => InnerChannel.BasicNackAsync(deliveryTag, multiple, requeue); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory); public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) { diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index cadec4ea49..1e893cf2b2 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -37,7 +37,9 @@ using System.Text; using System.Threading; using System.Threading.Tasks; + using RabbitMQ.Client.client.framing; +using RabbitMQ.Client.client.impl; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -80,11 +82,13 @@ protected ChannelBase(ConnectionConfig config, ISession session) ConsumerDispatcher = new ConsumerDispatcher(this, config.DispatchConsumerConcurrency); } - Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + Action onException = (exception, context) => + OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); _basicNacksWrapper = new EventingWrapper("OnBasicNack", onException); _basicReturnWrapper = new EventingWrapper("OnBasicReturn", onException); - _callbackExceptionWrapper = new EventingWrapper(string.Empty, (exception, context) => { }); + _callbackExceptionWrapper = + new EventingWrapper(string.Empty, (exception, context) => { }); _flowControlWrapper = new EventingWrapper("OnFlowControl", onException); _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); @@ -101,6 +105,7 @@ public event EventHandler BasicAcks add => _basicAcksWrapper.AddHandler(value); remove => _basicAcksWrapper.RemoveHandler(value); } + private EventingWrapper _basicAcksWrapper; public event EventHandler BasicNacks @@ -108,6 +113,7 @@ public event EventHandler BasicNacks add => _basicNacksWrapper.AddHandler(value); remove => _basicNacksWrapper.RemoveHandler(value); } + private EventingWrapper _basicNacksWrapper; public event EventHandler BasicReturn @@ -115,6 +121,7 @@ public event EventHandler BasicReturn add => _basicReturnWrapper.AddHandler(value); remove => _basicReturnWrapper.RemoveHandler(value); } + private EventingWrapper _basicReturnWrapper; public event EventHandler CallbackException @@ -122,6 +129,7 @@ public event EventHandler CallbackException add => _callbackExceptionWrapper.AddHandler(value); remove => _callbackExceptionWrapper.RemoveHandler(value); } + private EventingWrapper _callbackExceptionWrapper; public event EventHandler FlowControl @@ -129,6 +137,7 @@ public event EventHandler FlowControl add => _flowControlWrapper.AddHandler(value); remove => _flowControlWrapper.RemoveHandler(value); } + private EventingWrapper _flowControlWrapper; public event EventHandler ChannelShutdown @@ -146,6 +155,7 @@ public event EventHandler ChannelShutdown } remove => _channelShutdownWrapper.RemoveHandler(value); } + private EventingWrapper _channelShutdownWrapper; public event EventHandler Recovery @@ -153,6 +163,7 @@ public event EventHandler Recovery add => _recoveryWrapper.AddHandler(value); remove => _recoveryWrapper.RemoveHandler(value); } + private EventingWrapper _recoveryWrapper; internal void RunRecoveryEventHandlers(object sender) @@ -344,7 +355,8 @@ await ModelSendAsync(method) } } - internal async ValueTask ConnectionStartOkAsync(IDictionary clientProperties, string mechanism, byte[] response, + internal async ValueTask ConnectionStartOkAsync( + IDictionary clientProperties, string mechanism, byte[] response, string locale) { await _rpcSemaphore.WaitAsync() @@ -510,11 +522,13 @@ protected void ChannelSend(in TMethod method, in THeader heade { _flowControlBlock.Wait(); } + Session.Transmit(in method, in header, body); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, + ReadOnlyMemory body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -556,9 +570,11 @@ private void OnChannelShutdown(ShutdownEventArgs reason) { confirmsTaskCompletionSource.TrySetException(exception); } + _confirmsTaskCompletionSources.Clear(); } } + _flowControlBlock.Set(); } @@ -680,6 +696,7 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) { confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived); } + _confirmsTaskCompletionSources.Clear(); _onlyAcksReceived = true; } @@ -1134,7 +1151,17 @@ await _rpcSemaphore.WaitAsync() await ModelSendAsync(method) .ConfigureAwait(false); - return await k; + BasicGetResult result = await k; + + using Activity activity = result != null + ? RabbitMQActivitySource.Receive(result.RoutingKey, + result.Exchange, + result.DeliveryTag, result.BasicProperties, result.Body.Length) + : RabbitMQActivitySource.ReceiveEmpty(queue); + + activity?.SetStartTime(k.StartTime); + + return result; } finally { @@ -1158,6 +1185,17 @@ public ValueTask BasicPublishAsync(string exchange, string routingK try { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + return ModelSendAsync(in cmd, in props, body); + } + return ModelSendAsync(in cmd, in basicProperties, body); } catch @@ -1175,7 +1213,22 @@ public ValueTask BasicPublishAsync(string exchange, string routingK } } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value) + { + if (!(propsObj is Dictionary headers)) + { + return; + } + + // Only propagate headers if they haven't already been set + if (!headers.ContainsKey(key)) + { + headers[key] = value; + } + } + + public void BasicPublish(CachedString exchange, CachedString routingKey, + in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1189,7 +1242,63 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + ChannelSend(in cmd, in props, body); + return; + } + + ChannelSend(in cmd, in basicProperties, body); + } + catch + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + NextPublishSeqNo--; + _pendingDeliveryTags.RemoveLast(); + } + } + + throw; + } + } + + public async ValueTask BasicPublishAsync(string exchange, string routingKey, + TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + } + } + + try + { + var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body); + return; + } + + await ModelSendAsync(in cmd, in basicProperties, body); } catch { @@ -1206,6 +1315,70 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr } } + public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + } + } + + try + { + var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body); + return; + } + + await ModelSendAsync(in cmd, in basicProperties, body); + } + catch + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + NextPublishSeqNo--; + _pendingDeliveryTags.RemoveLast(); + } + } + + throw; + } + } + +#if FOO + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); + props.Headers = headers; + return props; + } +#endif + public async Task UpdateSecretAsync(string newSecret, string reason) { if (newSecret is null) @@ -1710,6 +1883,7 @@ public Task WaitForConfirmsAsync(CancellationToken token = default) _onlyAcksReceived = true; return Task.FromResult(false); } + return Task.FromResult(true); } @@ -1780,5 +1954,35 @@ await CloseAsync(ea, false) throw ex; } } + + private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, + Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); + } + } + + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); + props.Headers = headers; + return props; + } } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 0645850342..056be859ce 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -31,7 +31,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; +using System.Net; +using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -445,6 +448,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) internal void Write(RentedMemory frames) { + Activity.Current.SetNetworkTags(_frameHandler); ValueTask task = _frameHandler.WriteAsync(frames); if (!task.IsCompletedSuccessfully) { @@ -454,6 +458,7 @@ internal void Write(RentedMemory frames) internal ValueTask WriteAsync(RentedMemory frames) { + Activity.Current.SetNetworkTags(_frameHandler); return _frameHandler.WriteAsync(frames); } diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs new file mode 100644 index 0000000000..f68d4e1596 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -0,0 +1,290 @@ +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Reflection; +using System.Text; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Client +{ + public static class RabbitMQActivitySource + { + // These constants are defined in the OpenTelemetry specification: + // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes + internal const string MessageId = "messaging.message.id"; + internal const string MessageConversationId = "messaging.message.conversation_id"; + internal const string MessagingOperation = "messaging.operation"; + internal const string MessagingSystem = "messaging.system"; + internal const string MessagingDestination = "messaging.destination.name"; + internal const string MessagingDestinationRoutingKey = "messaging.rabbitmq.destination.routing_key"; + internal const string MessagingBodySize = "messaging.message.body.size"; + internal const string MessagingEnvelopeSize = "messaging.message.envelope.size"; + internal const string ProtocolName = "network.protocol.name"; + internal const string ProtocolVersion = "network.protocol.version"; + internal const string RabbitMQDeliveryTag = "messaging.rabbitmq.delivery_tag"; + + private static readonly string AssemblyVersion = typeof(RabbitMQActivitySource).Assembly + .GetCustomAttribute() + ?.InformationalVersion ?? ""; + + private static readonly ActivitySource s_publisherSource = new ActivitySource(PublisherSourceName, AssemblyVersion); + private static readonly ActivitySource s_subscriberSource = new ActivitySource(SubscriberSourceName, AssemblyVersion); + + public const string PublisherSourceName = "RabbitMQ.Client.Publisher"; + public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber"; + + public static bool UseRoutingKeyAsOperationName { get; set; } = true; + internal static bool PublisherHasListeners => s_publisherSource.HasListeners(); + internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners(); + + internal static readonly IEnumerable> CreationTags = new[] + { + new KeyValuePair(MessagingSystem, "rabbitmq"), + new KeyValuePair(ProtocolName, "amqp"), + new KeyValuePair(ProtocolVersion, "0.9.1") + }; + + internal static Activity Send(string routingKey, string exchange, int bodySize, + ActivityContext linkedContext = default) + { + if (s_publisherSource.HasListeners()) + { + Activity activity = linkedContext == default + ? s_publisherSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + ActivityKind.Producer) + : s_publisherSource.StartLinkedRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + ActivityKind.Producer, linkedContext); + if (activity?.IsAllDataRequested == true) + { + PopulateMessagingTags("publish", routingKey, exchange, 0, bodySize, activity); + } + + return activity; + } + + return null; + } + + internal static Activity ReceiveEmpty(string queue) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + Activity activity = s_subscriberSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive", + ActivityKind.Consumer); + if (activity.IsAllDataRequested) + { + activity + .SetTag(MessagingOperation, "receive") + .SetTag(MessagingDestination, "amq.default"); + } + + return activity; + } + + internal static Activity Receive(string routingKey, string exchange, ulong deliveryTag, + in ReadOnlyBasicProperties readOnlyBasicProperties, int bodySize) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + // Extract the PropagationContext of the upstream parent from the message headers. + DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties.Headers, + ExtractTraceIdAndState, out string traceParent, out string traceState); + ActivityContext.TryParse(traceParent, traceState, out ActivityContext linkedContext); + Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer, + linkedContext); + if (activity.IsAllDataRequested) + { + PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties, + bodySize, activity); + } + + return activity; + } + + internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + // Extract the PropagationContext of the upstream parent from the message headers. + DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers, + ExtractTraceIdAndState, out string traceparent, out string traceState); + ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); + Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver", + ActivityKind.Consumer, parentContext); + if (activity != null && activity.IsAllDataRequested) + { + PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, + deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, + activity); + } + + return activity; + + } + + private static Activity StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext parentContext = default) + { + Activity activity = source + .CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start(); + return activity; + } + + private static Activity StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext linkedContext = default, ActivityContext parentContext = default) + { + Activity activity = source.CreateActivity(name, kind, parentContext: parentContext, + links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C, + tags: CreationTags) + ?.Start(); + return activity; + } + + private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + ulong deliveryTag, in ReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity) + { + PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity); + + if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId)) + { + activity.SetTag(MessageConversationId, readOnlyBasicProperties.CorrelationId); + } + + if (!string.IsNullOrEmpty(readOnlyBasicProperties.MessageId)) + { + activity.SetTag(MessageId, readOnlyBasicProperties.MessageId); + } + } + + private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + ulong deliveryTag, int bodySize, Activity activity) + { + activity + .SetTag(MessagingOperation, operation) + .SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange) + .SetTag(MessagingDestinationRoutingKey, routingKey) + .SetTag(MessagingBodySize, bodySize); + + if (deliveryTag > 0) + { + activity.SetTag(RabbitMQDeliveryTag, deliveryTag); + } + } + + internal static void PopulateMessageEnvelopeSize(Activity activity, int size) + { + if (activity != null && activity.IsAllDataRequested && PublisherHasListeners) + { + activity.SetTag(MessagingEnvelopeSize, size); + } + } + + internal static bool TryGetExistingContext(T props, out ActivityContext context) + where T : IReadOnlyBasicProperties + { + if (props.Headers == null) + { + context = default; + return false; + } + + bool hasHeaders = false; + foreach (string header in DistributedContextPropagator.Current.Fields) + { + if (props.Headers.ContainsKey(header)) + { + hasHeaders = true; + break; + } + } + + if (hasHeaders) + { + DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, ExtractTraceIdAndState, + out string traceParent, out string traceState); + return ActivityContext.TryParse(traceParent, traceState, out context); + } + + context = default; + return false; + } + + private static void ExtractTraceIdAndState(object props, string name, out string value, + out IEnumerable values) + { + if (props is Dictionary headers && headers.TryGetValue(name, out object propsVal) && + propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else + { + value = default; + values = default; + } + } + + internal static void SetNetworkTags(this Activity activity, IFrameHandler frameHandler) + { + if (PublisherHasListeners && activity != null && activity.IsAllDataRequested) + { + switch (frameHandler.RemoteEndPoint.AddressFamily) + { + case AddressFamily.InterNetworkV6: + activity.SetTag("network.type", "ipv6"); + break; + case AddressFamily.InterNetwork: + activity.SetTag("network.type", "ipv4"); + break; + } + + if (!string.IsNullOrEmpty(frameHandler.Endpoint.HostName)) + { + activity + .SetTag("server.address", frameHandler.Endpoint.HostName); + } + + activity + .SetTag("server.port", frameHandler.Endpoint.Port); + + if (frameHandler.RemoteEndPoint is IPEndPoint ipEndpoint) + { + string remoteAddress = ipEndpoint.Address.ToString(); + if (activity.GetTagItem("server.address") == null) + { + activity + .SetTag("server.address", remoteAddress); + } + + activity + .SetTag("network.peer.address", remoteAddress) + .SetTag("network.peer.port", ipEndpoint.Port); + } + + if (frameHandler.LocalEndPoint is IPEndPoint localEndpoint) + { + string localAddress = localEndpoint.Address.ToString(); + activity + .SetTag("client.address", localAddress) + .SetTag("client.port", localEndpoint.Port) + .SetTag("network.local.address", localAddress) + .SetTag("network.local.port", localEndpoint.Port); + } + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs index b84687ca45..2b2dbad895 100644 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs @@ -38,6 +38,7 @@ namespace RabbitMQ.Client.Impl internal class SimpleBlockingRpcContinuation : IRpcContinuation { private readonly BlockingCell> m_cell = new BlockingCell>(); + internal DateTime StartTime { get; } = DateTime.UtcNow; public void GetReply(TimeSpan timeout) { diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index a433626878..d2c8d3d545 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -122,7 +123,7 @@ public void Notify() { // Ensure that we notify only when session is already closed // If not, throw exception, since this is a serious bug in the library - var reason = CloseReason; + ShutdownEventArgs reason = CloseReason; if (reason is null) { throw new Exception("Internal Error in Session.Close"); @@ -138,7 +139,9 @@ public virtual void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + Connection.Write(bytes); } public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingAmqpMethod @@ -148,7 +151,9 @@ public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingA ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + return Connection.WriteAsync(bytes); } public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) @@ -160,7 +165,10 @@ public void Transmit(in TMethod cmd, in THeader header, ReadOn ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, + ChannelNumber, Connection.MaxPayloadSize); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + Connection.Write(bytes); } public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) @@ -172,7 +180,10 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, + Connection.MaxPayloadSize); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + return Connection.WriteAsync(bytes); } private void ThrowAlreadyClosedException() diff --git a/projects/Test/Common/ProcessUtil.cs b/projects/Test/Common/ProcessUtil.cs index 7eb9196f68..eb262e451c 100644 --- a/projects/Test/Common/ProcessUtil.cs +++ b/projects/Test/Common/ProcessUtil.cs @@ -32,7 +32,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) var processTasks = new List(); // === EXITED Event handling === - var processExitEvent = new TaskCompletionSource(); + var processExitEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.Exited += (sender, args) => { @@ -46,7 +46,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) if (process.StartInfo.RedirectStandardOutput) { - var stdOutCloseEvent = new TaskCompletionSource(); + var stdOutCloseEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.OutputDataReceived += (s, e) => { @@ -72,7 +72,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) if (process.StartInfo.RedirectStandardError) { - var stdErrCloseEvent = new TaskCompletionSource(); + var stdErrCloseEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.ErrorDataReceived += (s, e) => { diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 8a190a1749..f9fbd7831f 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -48,7 +48,7 @@ public TestChannelShutdown(ITestOutputHelper output) : base(output) public async Task TestConsumerDispatcherShutdown() { var autorecoveringChannel = (AutorecoveringChannel)_channel; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index db2f0a1d3b..2afb9d6d66 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -102,7 +102,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in { return TestConcurrentChannelOperationsAsync(async (conn) => { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // publishing on a shared channel is not supported // and would missing the point of this test anyway diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index 4adaf31d06..8f17c447fd 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -79,7 +79,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, in properties, body); + await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); try @@ -89,7 +89,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) CorrelationId = new string('o', correlationIdLength) }; // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, in properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); } catch @@ -99,7 +99,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, in properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); // _output.WriteLine("I'm done..."); } diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index d38b3d3ee1..5959ed2b2f 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -48,7 +48,7 @@ public TestConnectionShutdown(ITestOutputHelper output) : base(output) [Fact] public async Task TestCleanClosureWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { tcs.SetResult(true); @@ -64,7 +64,7 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand() [Fact] public async Task TestAbortWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { tcs.SetResult(true); @@ -82,7 +82,7 @@ public async Task TestAbortWithSocketClosedOutOfBand() [Fact] public async Task TestDisposedWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { @@ -103,7 +103,7 @@ public async Task TestDisposedWithSocketClosedOutOfBand() [Fact] public async Task TestShutdownSignalPropagationToChannels() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { @@ -119,7 +119,7 @@ public async Task TestShutdownSignalPropagationToChannels() public async Task TestConsumerDispatcherShutdown() { var m = (AutorecoveringChannel)_channel; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { diff --git a/projects/Test/Integration/TestConsumer.cs b/projects/Test/Integration/TestConsumer.cs index 681969bc0a..e2175a404f 100644 --- a/projects/Test/Integration/TestConsumer.cs +++ b/projects/Test/Integration/TestConsumer.cs @@ -133,7 +133,7 @@ public async Task ConcurrentEventingTestForReceived() countdownEvent.Wait(); // Add last receiver - var lastConsumerReceivedTcs = new TaskCompletionSource(); + var lastConsumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { lastConsumerReceivedTcs.SetResult(true); diff --git a/projects/Test/Integration/TestConsumerCancelNotify.cs b/projects/Test/Integration/TestConsumerCancelNotify.cs index 68a64bf1f6..99983c6075 100644 --- a/projects/Test/Integration/TestConsumerCancelNotify.cs +++ b/projects/Test/Integration/TestConsumerCancelNotify.cs @@ -40,7 +40,7 @@ namespace Test.Integration { public class TestConsumerCancelNotify : IntegrationFixture { - private readonly TaskCompletionSource _tcs = new TaskCompletionSource(); + private readonly TaskCompletionSource _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private string _consumerTag; public TestConsumerCancelNotify(ITestOutputHelper output) : base(output) diff --git a/projects/Test/Integration/TestConsumerExceptions.cs b/projects/Test/Integration/TestConsumerExceptions.cs index d119eba2c4..edf2224531 100644 --- a/projects/Test/Integration/TestConsumerExceptions.cs +++ b/projects/Test/Integration/TestConsumerExceptions.cs @@ -108,7 +108,7 @@ public override void HandleBasicCancelOk(string consumerTag) protected async Task TestExceptionHandlingWithAsync(IBasicConsumer consumer, Func action) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool notified = false; string q = await _channel.QueueDeclareAsync(); diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestConsumerOperationDispatch.cs index c90f2e9871..2340ca60e0 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestConsumerOperationDispatch.cs @@ -164,7 +164,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() string q2 = (await ch2.QueueDeclareAsync()).QueueName; await ch2.QueueBindAsync(queue: q2, exchange: _x, routingKey: ""); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await ch1.BasicConsumeAsync(q1, true, new EventingBasicConsumer(ch1)); var c2 = new EventingBasicConsumer(ch2); c2.Received += (object sender, BasicDeliverEventArgs e) => @@ -183,8 +183,8 @@ private class ShutdownLatchConsumer : DefaultBasicConsumer { public ShutdownLatchConsumer() { - Latch = new TaskCompletionSource(); - DuplicateLatch = new TaskCompletionSource(); + Latch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + DuplicateLatch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } public readonly TaskCompletionSource Latch; diff --git a/projects/Test/Integration/TestEventingConsumer.cs b/projects/Test/Integration/TestEventingConsumer.cs index 4ee805dd2f..1721d5cb08 100644 --- a/projects/Test/Integration/TestEventingConsumer.cs +++ b/projects/Test/Integration/TestEventingConsumer.cs @@ -48,10 +48,10 @@ public async Task TestEventingConsumerRegistrationEvents() { string q = await _channel.QueueDeclareAsync(); - var registeredTcs = new TaskCompletionSource(); + var registeredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); object registeredSender = null; - var unregisteredTcs = new TaskCompletionSource(); + var unregisteredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); object unregisteredSender = null; EventingBasicConsumer ec = new EventingBasicConsumer(_channel); @@ -85,7 +85,7 @@ public async Task TestEventingConsumerRegistrationEvents() [Fact] public async Task TestEventingConsumerDeliveryEvents() { - var tcs0 = new TaskCompletionSource(); + var tcs0 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); string q = await _channel.QueueDeclareAsync(); bool receivedInvoked = false; @@ -103,7 +103,7 @@ public async Task TestEventingConsumerDeliveryEvents() await _channel.BasicPublishAsync("", q, _encoding.GetBytes("msg")); await WaitAsync(tcs0, "received event"); - var tcs1 = new TaskCompletionSource(); + var tcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Assert.True(receivedInvoked); Assert.NotNull(receivedSender); diff --git a/projects/Test/Integration/TestInvalidAck.cs b/projects/Test/Integration/TestInvalidAck.cs index 1eb5d735a8..4a349e2916 100644 --- a/projects/Test/Integration/TestInvalidAck.cs +++ b/projects/Test/Integration/TestInvalidAck.cs @@ -45,7 +45,7 @@ public TestInvalidAck(ITestOutputHelper output) : base(output) [Fact] public async Task TestAckWithUnknownConsumerTagAndMultipleFalse() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool shutdownFired = false; ShutdownEventArgs shutdownArgs = null; _channel.ChannelShutdown += (s, args) => diff --git a/projects/Test/Integration/TestMainLoop.cs b/projects/Test/Integration/TestMainLoop.cs index eda581fc37..646e0ce50a 100644 --- a/projects/Test/Integration/TestMainLoop.cs +++ b/projects/Test/Integration/TestMainLoop.cs @@ -63,7 +63,7 @@ public override Task HandleBasicDeliverAsync(string consumerTag, [Fact] public async Task TestCloseWithFaultyConsumer() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); CallbackExceptionEventArgs ea = null; diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs new file mode 100644 index 0000000000..2be177bec3 --- /dev/null +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -0,0 +1,247 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.SequentialIntegration +{ + public class TestActivitySource : SequentialIntegrationFixture + { + public TestActivitySource(ITestOutputHelper output) : base(output) + { + } + + void AssertStringTagEquals(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.Equal(expected, tag); + } + + void AssertStringTagStartsWith(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.StartsWith(expected, tag); + } + + void AssertStringTagNotNullOrEmpty(Activity activity, string name) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.False(string.IsNullOrEmpty(tag)); + } + + void AssertIntTagGreaterThanZero(Activity activity, string name) + { + Assert.True(activity.GetTagItem(name) is int result && result > 0); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var _activities = new List(); + using (ActivityListener activityListener = StartActivityListener(_activities)) + { + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try + { + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); + } + } + } + + private static ActivityListener StartActivityListener(List activities) + { + ActivityListener activityListener = new ActivityListener(); + activityListener.Sample = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded; + activityListener.SampleUsingParentId = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded; + activityListener.ShouldListenTo = + activitySource => activitySource.Name.StartsWith("RabbitMQ.Client."); + activityListener.ActivityStarted = activities.Add; + ActivitySource.AddActivityListener(activityListener); + return activityListener; + } + + private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName, + List activityList, bool isDeliver = false) + { + string childName = isDeliver ? "deliver" : "receive"; + Activity[] activities = activityList.ToArray(); + Assert.NotEmpty(activities); + foreach (var item in activities) + { + _output.WriteLine( + $"{item.Context.TraceId}: {item.OperationName}"); + _output.WriteLine($" Tags: {string.Join(", ", item.Tags.Select(x => $"{x.Key}: {x.Value}"))}"); + _output.WriteLine($" Links: {string.Join(", ", item.Links.Select(x => $"{x.Context.TraceId}"))}"); + } + + Activity sendActivity = activities.First(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") && + x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag && + routingKeyTag == $"{queueName}"); + Activity receiveActivity = activities.Single(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") && + x.Links.First().Context.TraceId == sendActivity.TraceId); + Assert.Equal(ActivityKind.Producer, sendActivity.Kind); + Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind); + Assert.Null(receiveActivity.ParentId); + AssertStringTagNotNullOrEmpty(sendActivity, "network.peer.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "network.local.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "server.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "client.address"); + AssertIntTagGreaterThanZero(sendActivity, "network.peer.port"); + AssertIntTagGreaterThanZero(sendActivity, "network.local.port"); + AssertIntTagGreaterThanZero(sendActivity, "server.port"); + AssertIntTagGreaterThanZero(sendActivity, "client.port"); + AssertStringTagStartsWith(sendActivity, "network.type", "ipv"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingSystem, "rabbitmq"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolName, "amqp"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolVersion, "0.9.1"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestination, "amq.default"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestinationRoutingKey, queueName); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); + AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + } + } +} diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs index f9e3eaf2d4..bfba03f359 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs @@ -53,7 +53,7 @@ public override async Task DisposeAsync() [Fact] public async Task TestConnectionBlockedNotification() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) => { UnblockAsync(); @@ -72,7 +72,7 @@ public async Task TestConnectionBlockedNotification() [Fact] public async Task TestDisposeOnBlockedConnectionDoesNotHang() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await BlockAsync(_channel); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index a31ac8603d..2deed73598 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -73,7 +73,7 @@ public override async Task DisposeAsync() [Fact] public async Task TestBasicAckAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new AckingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); @@ -96,7 +96,7 @@ public async Task TestBasicAckAfterChannelRecovery() [Fact] public async Task TestBasicNackAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new NackingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); @@ -119,7 +119,7 @@ public async Task TestBasicNackAfterChannelRecovery() [Fact] public async Task TestBasicRejectAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new RejectingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName; @@ -160,7 +160,7 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery() public async Task TestBasicAckEventHandlerRecovery() { await _channel.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); ((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); @@ -212,7 +212,7 @@ public async Task TestBasicChannelRecoveryOnServerRestart() [Fact] public async Task TestChannelAfterDispose_GH1086() { - TaskCompletionSource sawChannelShutdownTcs = new TaskCompletionSource(); + TaskCompletionSource sawChannelShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); void _channel_ChannelShutdown(object sender, ShutdownEventArgs e) { @@ -269,7 +269,7 @@ public async Task TestBlockedListenersRecovery() { try { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionBlocked += (c, reason) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); @@ -329,7 +329,7 @@ public async Task TestConsumerRecoveryWithManyConsumers() await _channel.BasicConsumeAsync(q, true, cons); } - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); @@ -522,7 +522,7 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() string q = (await _channel.QueueDeclareAsync(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null)).QueueName; string nameBefore = q; string nameAfter = null; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringConnection)_conn).QueueNameChangedAfterRecovery += (source, ea) => { @@ -634,7 +634,7 @@ public async Task TestServerNamedQueueRecovery() string nameBefore = q; string nameAfter = null; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var connection = (AutorecoveringConnection)_conn; connection.RecoverySucceeded += (source, ea) => tcs.SetResult(true); connection.QueueNameChangedAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; @@ -730,7 +730,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await CloseAndWaitForRecoveryAsync(); await AssertConsumerCountAsync(_channel, q, 1); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await _channel.BasicPublishAsync("", q, _messageBody); @@ -752,7 +752,7 @@ public async Task TestPublishRpcRightAfterReconnect() properties.ReplyTo = "amq.rabbitmq.reply-to"; TimeSpan doneSpan = TimeSpan.FromMilliseconds(100); - var doneTcs = new TaskCompletionSource(); + var doneTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Task closeTask = Task.Run(async () => { try @@ -913,7 +913,7 @@ public async Task TestThatDeletedQueuesDontReappearOnRecovery() [Fact] public async Task TestUnblockedListenersRecovery() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionUnblocked += (source, ea) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs b/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs index 5d7d495136..d76298f02c 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs @@ -229,7 +229,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) protected static TaskCompletionSource PrepareForShutdown(IConnection conn) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; aconn.ConnectionShutdown += (c, args) => tcs.SetResult(true); @@ -239,7 +239,7 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) protected static TaskCompletionSource PrepareForRecovery(IConnection conn) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); @@ -392,7 +392,7 @@ protected static async Task SendAndConsumeMessageAsync(IConnection conn, s { await ch.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer = new AckingBasicConsumer(ch, 1, tcs); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs index 57690db17b..a9c088dff0 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs @@ -132,7 +132,7 @@ public async Task TestConsumerWorkServiceRecovery() await CloseAndWaitForRecoveryAsync(c); Assert.True(ch.IsOpen); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")); @@ -177,7 +177,7 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() await AssertConsumerCountAsync(ch, q1, 1); Assert.False(queueNameChangeAfterRecoveryCalled); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await ch.BasicPublishAsync("", q1, _encoding.GetBytes("msg")); @@ -265,7 +265,7 @@ public async Task TestTopologyRecoveryConsumerFilter() ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered") }; - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter)) { @@ -289,12 +289,12 @@ public async Task TestTopologyRecoveryConsumerFilter() await ch.QueuePurgeAsync(queueWithRecoveredConsumer); await ch.QueuePurgeAsync(queueWithIgnoredConsumer); - var consumerRecoveryTcs = new TaskCompletionSource(); + var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToRecover = new EventingBasicConsumer(ch); consumerToRecover.Received += (source, ea) => consumerRecoveryTcs.SetResult(true); await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); - var ignoredTcs = new TaskCompletionSource(); + var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToIgnore = new EventingBasicConsumer(ch); consumerToIgnore.Received += (source, ea) => ignoredTcs.SetResult(true); await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); diff --git a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs index f58b1d9ecd..5c6d3f0725 100644 --- a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs @@ -70,7 +70,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await CloseAndWaitForRecoveryAsync(); await AssertConsumerCountAsync(_channel, q, 1); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await _channel.BasicPublishAsync("", q, _messageBody); @@ -84,7 +84,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() [Fact] public async Task TestTopologyRecoveryQueueFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -130,7 +130,7 @@ public async Task TestTopologyRecoveryQueueFilter() [Fact] public async Task TestTopologyRecoveryExchangeFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -174,7 +174,7 @@ public async Task TestTopologyRecoveryExchangeFilter() [Fact] public async Task TestTopologyRecoveryBindingFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -222,7 +222,7 @@ public async Task TestTopologyRecoveryBindingFilter() [Fact] public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter(); AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); @@ -245,12 +245,12 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() await ch.QueuePurgeAsync(queue1); await ch.QueuePurgeAsync(queue2); - var consumerReceivedTcs1 = new TaskCompletionSource(); + var consumerReceivedTcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer1 = new EventingBasicConsumer(ch); consumer1.Received += (source, ea) => consumerReceivedTcs1.SetResult(true); await ch.BasicConsumeAsync(queue1, true, "recovered.consumer", consumer1); - var consumerReceivedTcs2 = new TaskCompletionSource(); + var consumerReceivedTcs2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer2 = new EventingBasicConsumer(ch); consumer2.Received += (source, ea) => consumerReceivedTcs2.SetResult(true); await ch.BasicConsumeAsync(queue2, true, "filtered.consumer", consumer2); @@ -290,7 +290,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() [Fact] public async Task TestTopologyRecoveryQueueExceptionHandler() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var changedQueueArguments = new Dictionary { @@ -352,7 +352,7 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals [Fact] public async Task TestTopologyRecoveryExchangeExceptionHandler() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var exceptionHandler = new TopologyRecoveryExceptionHandler { @@ -408,7 +408,7 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler() [Fact] public async Task TestTopologyRecoveryBindingExceptionHandler() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); const string exchange = "topology.recovery.exchange"; const string queueWithExceptionBinding = "recovery.exception.queue"; @@ -469,7 +469,7 @@ public async Task TestTopologyRecoveryBindingExceptionHandler() [Fact] public async Task TestTopologyRecoveryConsumerExceptionHandler() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); string queueWithExceptionConsumer = "recovery.exception.queue"; diff --git a/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs b/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs index adbc1a0802..87622e98e1 100644 --- a/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs +++ b/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs @@ -129,7 +129,7 @@ public void TestDoNotRegisterWhenHasNoExpiry() [Fact] public async Task TestRefreshToken() { - var cbtcs = new TaskCompletionSource(); + var cbtcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool? callbackArg = null; var credentialsProvider = new MockCredentialsProvider(_testOutputHelper, TimeSpan.FromSeconds(1)); Task cb(bool arg) @@ -158,7 +158,7 @@ Task cb(bool arg) [Fact] public async Task TestRefreshTokenFailed() { - var cbtcs = new TaskCompletionSource(); + var cbtcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool? callbackArg = null; var credentialsProvider = new MockCredentialsProvider(_testOutputHelper, TimeSpan.FromSeconds(1)); Task cb(bool arg)