Skip to content

Commit

Permalink
Merge pull request #1633 from rabbitmq/rabbitmq-dotnet-client-1621
Browse files Browse the repository at this point in the history
Move `Deliver` OTEL activity to consumer dispatchers
  • Loading branch information
lukebakken authored Jul 22, 2024
2 parents 0c8492a + d3cf417 commit d1f8f75
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -80,8 +79,9 @@ public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

// No need to call base, it's empty.
return BasicDeliverWrapper(deliverEventArgs);
return _receivedWrapper.InvokeAsync(this, deliverEventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand All @@ -95,13 +95,5 @@ await _shutdownWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
}
}

private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
using (Activity? activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
}
}
}
12 changes: 4 additions & 8 deletions projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
Expand Down Expand Up @@ -89,13 +88,10 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
using (Activity? activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
Received?.Invoke(this, eventArgs);
}
var eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
Received?.Invoke(this, eventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand All @@ -24,23 +25,35 @@ protected override async Task ProcessChannelAsync()
{
try
{
Task task = work.WorkType switch
switch (work.WorkType)
{
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory),

WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!),

WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!),

WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!),

WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!),

_ => Task.CompletedTask
};
await task.ConfigureAwait(false);
case WorkType.Deliver:
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
{
await work.AsyncConsumer.HandleBasicDeliver(
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
await work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.CancelOk:
await work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.ConsumeOk:
await work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag!)
.ConfigureAwait(false);
break;
case WorkType.Shutdown:
await work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason!)
.ConfigureAwait(false);
break;
}
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;
Expand Down Expand Up @@ -29,10 +30,14 @@ protected override async Task ProcessChannelAsync()
switch (work.WorkType)
{
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
using (Activity? activity = RabbitMQActivitySource.Deliver(work.RoutingKey!, work.Exchange!,
work.DeliveryTag, work.BasicProperties!, work.Body.Size))
{
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
.ConfigureAwait(false);
}
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
Expand Down
14 changes: 6 additions & 8 deletions projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -46,7 +45,6 @@ public static class RabbitMQActivitySource

public static bool UseRoutingKeyAsOperationName { get; set; } = true;
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners();

internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
{
Expand Down Expand Up @@ -120,7 +118,8 @@ public static class RabbitMQActivitySource
return activity;
}

internal static Activity? Deliver(BasicDeliverEventArgs deliverEventArgs)
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties basicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
{
Expand All @@ -129,13 +128,12 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver",
ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties));
UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver",
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange,
deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length,
activity);
PopulateMessagingTags("deliver", routingKey, exchange,
deliveryTag, basicProperties, bodySize, activity);
}

return activity;
Expand Down

0 comments on commit d1f8f75

Please sign in to comment.