diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt index 8b13789179..074c6ad103 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt @@ -1 +1,2 @@ +#nullable enable diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt index b9ca055f90..3c5d70e73c 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt @@ -1,20 +1,30 @@ OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation -OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void -OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddConsumer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddConsumer(string! name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string! name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> void OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable>! config) -> void OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder -OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable> config) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable>! config) -> void OpenTelemetry.Metrics.ConfluentKafkaMetricsInstrumentationOptions OpenTelemetry.Metrics.ConfluentKafkaMetricsInstrumentationOptions.ConfluentKafkaMetricsInstrumentationOptions() -> void OpenTelemetry.Metrics.MeterProviderBuilderExtensions OpenTelemetry.Trace.TracerProviderBuilderExtensions -override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer -static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder builder) -> OpenTelemetry.Metrics.MeterProviderBuilder -static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder builder, System.Action configure) -> OpenTelemetry.Metrics.MeterProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.Build() -> Confluent.Kafka.IConsumer! +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, System.Action? configure) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder, System.Action? configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder, System.Action? configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt index 8b13789179..074c6ad103 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt @@ -1 +1,2 @@ +#nullable enable diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt index b9ca055f90..3c5d70e73c 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt @@ -1,20 +1,30 @@ OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation -OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void -OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddConsumer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddConsumer(string! name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string! name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> void OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable>! config) -> void OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder -OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable> config) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable>! config) -> void OpenTelemetry.Metrics.ConfluentKafkaMetricsInstrumentationOptions OpenTelemetry.Metrics.ConfluentKafkaMetricsInstrumentationOptions.ConfluentKafkaMetricsInstrumentationOptions() -> void OpenTelemetry.Metrics.MeterProviderBuilderExtensions OpenTelemetry.Trace.TracerProviderBuilderExtensions -override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer -static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder builder) -> OpenTelemetry.Metrics.MeterProviderBuilder -static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder builder, System.Action configure) -> OpenTelemetry.Metrics.MeterProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder -static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.Build() -> Confluent.Kafka.IConsumer! +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, System.Action? configure) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder, System.Action? configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder, System.Action? configure) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, System.Action! configure) -> OpenTelemetry.Trace.TracerProviderBuilder! diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs new file mode 100644 index 0000000000..b82fc7c201 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaConsumerInstrumentation : IDisposable +{ + internal static readonly string ActivitySourceName = typeof(ConfluentKafkaInstrumentation).Assembly.GetName().Name!; + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + // TODO release managed resources here + } + } +} + +#pragma warning disable SA1402 // File may only contain a single type +internal sealed class ConfluentKafkaConsumerInstrumentation : ConfluentKafkaConsumerInstrumentation +#pragma warning restore SA1402 // File may only contain a single type +{ + private readonly ConsumerBuilder consumerBuilder; + private readonly string name; + private readonly ConfluentKafkaInstrumentationOptions options; + + public ConfluentKafkaConsumerInstrumentation(ConsumerBuilder consumerBuilder, string name, ConfluentKafkaInstrumentationOptions options) + { + this.consumerBuilder = consumerBuilder; + this.name = name; + this.options = options; + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs index b46f0c7bf6..53f7c36069 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; using Confluent.Kafka; using Microsoft.Extensions.Options; using OpenTelemetry.Internal; @@ -13,12 +14,11 @@ namespace OpenTelemetry.Instrumentation.ConfluentKafka; public sealed class ConfluentKafkaInstrumentation : IDisposable { private readonly IOptionsMonitor options; - private readonly MetricsChannel metricsChannel; - private readonly CancellationTokenSource cts = new(); + private readonly MetricsChannel? metricsChannel; internal ConfluentKafkaInstrumentation( IOptionsMonitor options, - MetricsChannel metricsChannel) + MetricsChannel? metricsChannel) { this.options = options; this.metricsChannel = metricsChannel; @@ -26,6 +26,8 @@ internal ConfluentKafkaInstrumentation( internal List InstrumentedProducers { get; } = new(); + internal List InstrumentedConsumers { get; } = new(); + /// /// Adds an to the instrumentation. /// @@ -51,7 +53,7 @@ public void AddProducer(string name, InstrumentedProducerBuilder(string name, InstrumentedProducerBuilder + /// Adds an to the instrumentation. + /// + /// The type of the key. + /// The type of the value. + /// . + public void AddConsumer(InstrumentedConsumerBuilder consumerBuilder) + => this.AddConsumer(Options.DefaultName, consumerBuilder); + + /// + /// Adds an to the instrumentation. + /// + /// The type of the key. + /// The type of the value. + /// Name to use when retrieving options. + /// . + public void AddConsumer(string name, InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(name); + Guard.ThrowIfNull(consumerBuilder); + + var options = this.options.Get(name); + + if (this.metricsChannel is not null) + { + consumerBuilder.SetStatisticsHandler(this.OnConsumerStatistics); + } + + lock (this.InstrumentedConsumers) + { + var instrumentation = new ConfluentKafkaConsumerInstrumentation(consumerBuilder, name, options); + + this.InstrumentedConsumers.Add(instrumentation); + + lock (this.InstrumentedConsumers) + { + if (this.InstrumentedConsumers.Remove(instrumentation)) + { + instrumentation.Dispose(); + } + } + } + } + /// public void Dispose() { @@ -83,16 +129,38 @@ public void Dispose() this.InstrumentedProducers.Clear(); } - this.cts.Dispose(); + lock (this.InstrumentedConsumers) + { + foreach (var instrumentation in this.InstrumentedConsumers) + { + instrumentation.Dispose(); + } + + this.InstrumentedConsumers.Clear(); + } + } + + private void OnProducerStatistics(IProducer producer, string json) + { + Debug.Assert(this.metricsChannel is not null, "metricsChannel is null"); + + if (string.IsNullOrEmpty(json)) + { + return; + } + + this.metricsChannel!.Writer.TryWrite(json); } - private void OnStatistics(IProducer producer, string json) + private void OnConsumerStatistics(IConsumer consumer, string json) { + Debug.Assert(this.metricsChannel is not null, "metricsChannel is null"); + if (string.IsNullOrEmpty(json)) { return; } - this.metricsChannel.Writer.TryWrite(json); + this.metricsChannel!.Writer.TryWrite(json); } } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs index dc893a9e6a..81b0816e55 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs @@ -80,7 +80,7 @@ public async Task ExecuteAsync() { while (this.channel.Reader.TryRead(out var json)) { - Statistics statistics; + Statistics? statistics; try { statistics = JsonSerializer.Deserialize(json, StatisticsJsonSerializerContext.Default.Statistics); @@ -105,7 +105,7 @@ public async Task ExecuteAsync() this.MessageCountMeasurements.Enqueue(new Measurement(statistics.MessageCount, tags)); this.MessageSizeMeasurements.Enqueue(new Measurement(statistics.MessageSize, tags)); - tags.Add(new KeyValuePair(Tags.Type, statistics.Type)); + tags.Add(new KeyValuePair(Tags.Type, statistics.Type)); if (this.state.TryGetValue(statistics.Name, out var previous)) { diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs new file mode 100644 index 0000000000..476dfd1c58 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -0,0 +1,254 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text; +using Confluent.Kafka; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class InstrumentedConsumer : IConsumer +{ + private static readonly ActivitySource ActivitySource = new("OpenTelemetry.Instrumentation.ConfluentKafka"); + + private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator; + private readonly IConsumer consumerImplementation; + + public InstrumentedConsumer(IConsumer consumer) + { + this.consumerImplementation = consumer; + } + + public Handle Handle => this.consumerImplementation.Handle; + + public string Name => this.consumerImplementation.Name; + + public string MemberId => this.consumerImplementation.MemberId; + + public List Assignment => this.consumerImplementation.Assignment; + + public List Subscription => this.consumerImplementation.Subscription; + + public IConsumerGroupMetadata ConsumerGroupMetadata => this.consumerImplementation.ConsumerGroupMetadata; + + public string? GroupId { get; internal set; } + + public void Dispose() + { + this.consumerImplementation.Dispose(); + } + + public int AddBrokers(string brokers) + { + return this.consumerImplementation.AddBrokers(brokers); + } + + public void SetSaslCredentials(string username, string password) + { + this.consumerImplementation.SetSaslCredentials(username, password); + } + + public ConsumeResult? Consume(int millisecondsTimeout) + { + var consumeResult = this.consumerImplementation.Consume(millisecondsTimeout); + if (consumeResult?.Message != null) + { + var propagationContext = this.ExtractActivity(consumeResult.Message); + + using var activity = this.StartActivity("receive", consumeResult.TopicPartitionOffset, propagationContext.ActivityContext); + } + + return consumeResult; + } + + public ConsumeResult? Consume(CancellationToken cancellationToken = default) + { + var consumeResult = this.consumerImplementation.Consume(cancellationToken); + if (consumeResult?.Message != null) + { + var propagationContext = this.ExtractActivity(consumeResult.Message); + + using var activity = this.StartActivity("receive", consumeResult.TopicPartitionOffset, propagationContext.ActivityContext); + } + + return consumeResult; + } + + public ConsumeResult? Consume(TimeSpan timeout) + { + var consumeResult = this.consumerImplementation.Consume(timeout); + if (consumeResult?.Message != null) + { + var propagationContext = this.ExtractActivity(consumeResult.Message); + + using var activity = this.StartActivity("receive", consumeResult.TopicPartitionOffset, propagationContext.ActivityContext); + } + + return consumeResult; + } + + public void Subscribe(IEnumerable topics) + { + this.consumerImplementation.Subscribe(topics); + } + + public void Subscribe(string topic) + { + this.consumerImplementation.Subscribe(topic); + } + + public void Unsubscribe() + { + this.consumerImplementation.Unsubscribe(); + } + + public void Assign(TopicPartition partition) + { + this.consumerImplementation.Assign(partition); + } + + public void Assign(TopicPartitionOffset partition) + { + this.consumerImplementation.Assign(partition); + } + + public void Assign(IEnumerable partitions) + { + this.consumerImplementation.Assign(partitions); + } + + public void Assign(IEnumerable partitions) + { + this.consumerImplementation.Assign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumerImplementation.IncrementalAssign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumerImplementation.IncrementalAssign(partitions); + } + + public void IncrementalUnassign(IEnumerable partitions) + { + this.consumerImplementation.IncrementalUnassign(partitions); + } + + public void Unassign() + { + this.consumerImplementation.Unassign(); + } + + public void StoreOffset(ConsumeResult result) + { + this.consumerImplementation.StoreOffset(result); + } + + public void StoreOffset(TopicPartitionOffset offset) + { + this.consumerImplementation.StoreOffset(offset); + } + + public List Commit() + { + return this.consumerImplementation.Commit(); + } + + public void Commit(IEnumerable offsets) + { + this.consumerImplementation.Commit(offsets); + } + + public void Commit(ConsumeResult result) + { + this.consumerImplementation.Commit(result); + } + + public void Seek(TopicPartitionOffset tpo) + { + this.consumerImplementation.Seek(tpo); + } + + public void Pause(IEnumerable partitions) + { + this.consumerImplementation.Pause(partitions); + } + + public void Resume(IEnumerable partitions) + { + this.consumerImplementation.Resume(partitions); + } + + public List Committed(TimeSpan timeout) + { + return this.consumerImplementation.Committed(timeout); + } + + public List Committed(IEnumerable partitions, TimeSpan timeout) + { + return this.consumerImplementation.Committed(partitions, timeout); + } + + public Offset Position(TopicPartition partition) + { + return this.consumerImplementation.Position(partition); + } + + public List OffsetsForTimes(IEnumerable timestampsToSearch, TimeSpan timeout) + { + return this.consumerImplementation.OffsetsForTimes(timestampsToSearch, timeout); + } + + public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) + { + return this.consumerImplementation.GetWatermarkOffsets(topicPartition); + } + + public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) + { + return this.consumerImplementation.QueryWatermarkOffsets(topicPartition, timeout); + } + + public void Close() + { + this.consumerImplementation.Close(); + } + + private Activity? StartActivity(string operation, TopicPartitionOffset topicPartitionOffset, ActivityContext activityContext) + { + var activity = ActivitySource.StartActivity(string.Concat(topicPartitionOffset.Topic, " ", operation), ActivityKind.Consumer, activityContext); + if (activity != null) + { + activity.SetTag(SemanticConventions.AttributeMessagingSystem, "kafka"); + activity.SetTag("messaging.client_id", this.Name); + activity.SetTag("messaging.destination.name", topicPartitionOffset.Topic); + activity.SetTag("messaging.kafka.destination.partition", topicPartitionOffset.Partition.Value); + activity.SetTag("messaging.kafka.message.offset", topicPartitionOffset.Offset.Value); + activity.SetTag("messaging.kafka.consumer.group", this.GroupId); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, operation); + } + + return activity; + } + + private PropagationContext ExtractActivity(Message message) + { + PropagationContext propagationContext = Activity.Current != null + ? new PropagationContext(Activity.Current!.Context, Baggage.Current) + : default; + return this.propagator.Extract(propagationContext, message, this.ExtractTraceContext); + } + + private IEnumerable ExtractTraceContext(Message message, string value) + { + if (message.Headers?.TryGetLastBytes(value, out var bytes) == true) + { + yield return Encoding.UTF8.GetString(bytes); + } + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs new file mode 100644 index 0000000000..5aadd2ac14 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +/// +/// A builder of with support for instrumentation. +/// +/// Type of the key. +/// Type of value. +public sealed class InstrumentedConsumerBuilder : ConsumerBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified. + public InstrumentedConsumerBuilder(IEnumerable> config) + : base(config) + { + } + + /// + /// Build a new IConsumer instance. + /// + /// an . + public override IConsumer Build() + { + var consumer = new InstrumentedConsumer(base.Build()); + consumer.GroupId = (this.Config as ConsumerConfig)?.GroupId; + return consumer; + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs index 00d4c02dea..92b43af87a 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -16,9 +16,9 @@ internal sealed class InstrumentedProducer : IProducer producerImplementation; private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator; - public InstrumentedProducer(IProducer producerImplementation) + public InstrumentedProducer(IProducer producer) { - this.producerImplementation = producerImplementation; + this.producerImplementation = producer; } public Handle Handle => this.producerImplementation.Handle; @@ -40,9 +40,11 @@ public async Task> ProduceAsync( Message message, CancellationToken cancellationToken = default) { - using Activity activity = this.StartActivity("publish", topic); - - this.InjectActivity(activity, message); + using Activity? activity = this.StartActivity("publish", topic) ?? Activity.Current; + ActivityContext activityContext = activity != null + ? activity.Context + : default; + this.InjectActivity(activityContext, message); return await this.producerImplementation.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); } @@ -52,27 +54,33 @@ public async Task> ProduceAsync( Message message, CancellationToken cancellationToken = default) { - using var activity = this.StartActivity("publish", topicPartition); - - this.InjectActivity(activity, message); + using Activity? activity = this.StartActivity("publish", topicPartition) ?? Activity.Current; + ActivityContext activityContext = activity != null + ? activity.Context + : default; + this.InjectActivity(activityContext, message); return await this.producerImplementation.ProduceAsync(topicPartition, message, cancellationToken).ConfigureAwait(false); } - public void Produce(string topic, Message message, Action> deliveryHandler = null) + public void Produce(string topic, Message message, Action>? deliveryHandler = null) { - using var activity = this.StartActivity("publish", topic); - - this.InjectActivity(activity, message); + using Activity? activity = this.StartActivity("publish", topic) ?? Activity.Current; + ActivityContext activityContext = activity != null + ? activity.Context + : default; + this.InjectActivity(activityContext, message); this.producerImplementation.Produce(topic, message, deliveryHandler); } - public void Produce(TopicPartition topicPartition, Message message, Action> deliveryHandler = null) + public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) { - using var activity = this.StartActivity("publish", topicPartition); - - this.InjectActivity(activity, message); + using Activity? activity = this.StartActivity("publish", topicPartition) ?? Activity.Current; + ActivityContext activityContext = activity != null + ? activity.Context + : default; + this.InjectActivity(activityContext, message); this.producerImplementation.Produce(topicPartition, message, deliveryHandler); } @@ -132,26 +140,36 @@ public void Dispose() this.producerImplementation.Dispose(); } - private Activity StartActivity(string operation, string topic) + private Activity? StartActivity(string operation, string topic) { var activity = ActivitySource.StartActivity(string.Concat(topic, " ", operation), ActivityKind.Producer); - activity?.SetTag(SemanticConventions.AttributeMessagingSystem, "kafka"); - activity?.SetTag("messaging.client_id", this.Name); - activity?.SetTag("messaging.destination.name", topic); - activity?.SetTag(SemanticConventions.AttributeMessagingOperation, operation); + if (activity == null) + { + return null; + } + + activity.SetTag(SemanticConventions.AttributeMessagingSystem, "kafka"); + activity.SetTag("messaging.client_id", this.Name); + activity.SetTag("messaging.destination.name", topic); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, operation); return activity; } - private Activity StartActivity(string operation, TopicPartition topicPartition) + private Activity? StartActivity(string operation, TopicPartition topicPartition) { var activity = this.StartActivity(operation, topicPartition.Topic); - activity?.SetTag("messaging.kafka.destination.partition", topicPartition.Partition.Value); + if (activity == null) + { + return null; + } + + activity.SetTag("messaging.kafka.destination.partition", topicPartition.Partition.Value); return activity; } - private void InjectActivity(Activity activity, Message message) + private void InjectActivity(ActivityContext activityContext, Message message) { - this.propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), message, this.InjectTraceContext); + this.propagator.Inject(new PropagationContext(activityContext, Baggage.Current), message, this.InjectTraceContext); } private void InjectTraceContext(Message message, string key, string value) diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.cs index ab13ef0793..2a5288af16 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.cs @@ -29,7 +29,7 @@ public static MeterProviderBuilder AddKafkaInstrumentation(this MeterProviderBui /// The instance of to chain the calls. public static MeterProviderBuilder AddKafkaInstrumentation( this MeterProviderBuilder builder, - Action configure) + Action? configure) { Guard.ThrowIfNull(builder); diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj index b676fa1b9b..eaf564fd60 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj @@ -7,7 +7,6 @@ true Instrumentation.ConfluentKafka- true - disable diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs index 2e18d3bf93..e9dd11d688 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs @@ -11,13 +11,13 @@ namespace OpenTelemetry.Instrumentation.ConfluentKafka; internal sealed class Statistics { [JsonPropertyName("name")] - public string Name { get; set; } + public string? Name { get; set; } [JsonPropertyName("client_id")] - public string ClientId { get; set; } + public string? ClientId { get; set; } [JsonPropertyName("type")] - public string Type { get; set; } + public string? Type { get; set; } [JsonPropertyName("ts")] public long Timestamp { get; set; } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs new file mode 100644 index 0000000000..ba3cd11490 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace; + +/// +/// Extension methods to simplify registering of dependency instrumentation. +/// +public static partial class TracerProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder) + => AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null, configure: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(consumerBuilder); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder, configure: null); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Callback to configure options. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + Action configure) + { + Guard.ThrowIfNull(configure); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null, configure); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// Callback to configure options. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder, + Action configure) + { + Guard.ThrowIfNull(consumerBuilder); + Guard.ThrowIfNull(configure); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder, configure); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Optional name which is used when retrieving options. + /// Optional to instrument. + /// Optional callback to configure options. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + string? name, + InstrumentedConsumerBuilder? consumerBuilder, + Action? configure) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.AddKafkaInstrumentationSharedServices(); + + if (configure != null) + { + builder.ConfigureServices(services => + { + services.Configure(name, configure); + }); + } + + return builder + .AddSource(ConfluentKafkaProducerInstrumentation.ActivitySourceName) + .AddInstrumentation(sp => + { + var instrumentation = sp.GetRequiredService(); + + consumerBuilder ??= sp.GetService>(); + + if (consumerBuilder != null) + { + instrumentation.AddConsumer(name, consumerBuilder); + } + + return instrumentation; + }); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs similarity index 92% rename from src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs rename to src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs index dd045d415d..f8900f7944 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs @@ -12,7 +12,7 @@ namespace OpenTelemetry.Trace; /// /// Extension methods to simplify registering of dependency instrumentation. /// -public static class TracerProviderBuilderExtensions +public static partial class TracerProviderBuilderExtensions { /// /// Enables automatic data collection of outgoing requests to Kafka. @@ -26,7 +26,7 @@ public static TracerProviderBuilder AddKafkaProducerInstrumentation AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null, configure: null); /// - /// Enables automatic data collection of outgoing requests to Redis. + /// Enables automatic data collection of outgoing requests to Kafka. /// /// The type of the key. /// The type of the value. @@ -43,7 +43,7 @@ public static TracerProviderBuilder AddKafkaProducerInstrumentation - /// Enables automatic data collection of outgoing requests to Redis. + /// Enables automatic data collection of outgoing requests to Kafka. /// /// The type of the key. /// The type of the value. @@ -60,7 +60,7 @@ public static TracerProviderBuilder AddKafkaProducerInstrumentation - /// Enables automatic data collection of outgoing requests to Redis. + /// Enables automatic data collection of outgoing requests to Kafka. /// /// The type of the key. /// The type of the value. @@ -80,7 +80,7 @@ public static TracerProviderBuilder AddKafkaProducerInstrumentation - /// Enables automatic data collection of outgoing requests to Redis. + /// Enables automatic data collection of outgoing requests to Kafka. /// /// The type of the key. /// The type of the value. @@ -91,9 +91,9 @@ public static TracerProviderBuilder AddKafkaProducerInstrumentationThe instance of to chain the calls. public static TracerProviderBuilder AddKafkaProducerInstrumentation( this TracerProviderBuilder builder, - string name, - InstrumentedProducerBuilder producerBuilder, - Action configure) + string? name, + InstrumentedProducerBuilder? producerBuilder, + Action? configure) { Guard.ThrowIfNull(builder); @@ -134,7 +134,9 @@ private static TracerProviderBuilder AddKafkaInstrumentationSharedServices( return builder.ConfigureServices(services => { services.TryAddSingleton( - sp => new ConfluentKafkaInstrumentation(sp.GetRequiredService>(), sp.GetService())); + sp => new ConfluentKafkaInstrumentation( + sp.GetRequiredService>(), + sp.GetService())); }); } } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs index 925288e9ca..c94a52939f 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs @@ -36,9 +36,8 @@ public ConfluentKafkaCallsInstrumentationTests(ITestOutputHelper outputHelper) } [Trait("CategoryName", "KafkaIntegrationTests")] - [SkipUnlessEnvVarFoundTheory(KafkaEndPointEnvVarName)] - [InlineData("any_value")] - public async Task BasicProduceAsyncToTopicTest(string value) + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicTest() { ProducerConfig producerConfig = new ProducerConfig { @@ -57,7 +56,7 @@ public async Task BasicProduceAsyncToTopicTest(string value) IProducer producer = producerBuilder.Build(); await producer.ProduceAsync(topic, new Message { - Value = value, + Value = "any_value", }); } @@ -69,9 +68,8 @@ public async Task BasicProduceAsyncToTopicTest(string value) } [Trait("CategoryName", "KafkaIntegrationTests")] - [SkipUnlessEnvVarFoundTheory(KafkaEndPointEnvVarName)] - [InlineData("any_value")] - public async Task BasicProduceAsyncToTopicPartitionTest(string value) + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicPartitionTest() { ProducerConfig producerConfig = new ProducerConfig { @@ -90,7 +88,7 @@ public async Task BasicProduceAsyncToTopicPartitionTest(string value) IProducer producer = producerBuilder.Build(); await producer.ProduceAsync(new TopicPartition(topic, new Partition(0)), new Message { - Value = value, + Value = "any_value", }); } @@ -103,9 +101,8 @@ public async Task BasicProduceAsyncToTopicPartitionTest(string value) } [Trait("CategoryName", "KafkaIntegrationTests")] - [SkipUnlessEnvVarFoundTheory(KafkaEndPointEnvVarName)] - [InlineData("any_value")] - public void BasicProduceSyncToTopicTest(string value) + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicTest() { ProducerConfig producerConfig = new ProducerConfig { @@ -124,7 +121,7 @@ public void BasicProduceSyncToTopicTest(string value) IProducer producer = producerBuilder.Build(); producer.Produce(topic, new Message { - Value = value, + Value = "any_value", }); } @@ -136,9 +133,8 @@ public void BasicProduceSyncToTopicTest(string value) } [Trait("CategoryName", "KafkaIntegrationTests")] - [SkipUnlessEnvVarFoundTheory(KafkaEndPointEnvVarName)] - [InlineData("any_value")] - public void BasicProduceSyncToTopicPartitionTest(string value) + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicPartitionTest() { ProducerConfig producerConfig = new ProducerConfig { @@ -157,7 +153,7 @@ public void BasicProduceSyncToTopicPartitionTest(string value) IProducer producer = producerBuilder.Build(); producer.Produce(new TopicPartition(topic, new Partition(0)), new Message { - Value = value, + Value = "any_value", }); } @@ -168,4 +164,173 @@ public void BasicProduceSyncToTopicPartitionTest(string value) Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithCancellationTokenTest() + { + string topic = await ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var exportedItems = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(exportedItems) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(exportedItems, activity => activity.DisplayName == topic + " receive"); + var activity = exportedItems.FirstOrDefault(); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutMsTest() + { + string topic = await ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var exportedItems = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(exportedItems) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(100); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(exportedItems, activity => activity.DisplayName == topic + " receive"); + var activity = exportedItems.FirstOrDefault(); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutTimespanTest() + { + string topic = await ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var exportedItems = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(exportedItems) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(exportedItems, activity => activity.DisplayName == topic + " receive"); + var activity = exportedItems.FirstOrDefault(); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + private static async Task ProduceTestMessageAsync() + { + string topic = $"otel-topic-{Guid.NewGuid()}"; + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaEndPoint, + }; + ProducerBuilder producerBuilder = new(producerConfig); + IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + return topic; + } } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj index da67bc6d1d..e436a9b213 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj @@ -11,6 +11,7 @@ +