From 182c580dc6b5a18d8552dace309c02c8ee7cb701 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 19 Sep 2022 13:38:05 -0400 Subject: [PATCH] GH-2198: Spring Observability Initial Commit (#2394) * GH-2198: Spring Observability Initial Commit Resolves https://github.com/spring-projects/spring-kafka/issues/2198 * Move getContextualContext to conventions. * Fix generics in test. * Add docs. * Fix doc link. * Remove unnecessary method overrides; make tag names more descriptive. * Async stop for send. * Fix checkstyle. * Fix async stop - don't stop on sync success; change order of spans for test. * Fix generics in test. * Fix checkstyle. * Fix race in test; with async send spans, finished spans order is indeterminate. * Move getName() from context to convention. * Fix Race in Test Fix Race in Test. Fix Race in Test. Fix Race in Test. Fix Race in Test. --- build.gradle | 2 + .../src/main/asciidoc/kafka.adoc | 16 +- .../src/main/asciidoc/whats-new.adoc | 6 + .../kafka/core/KafkaTemplate.java | 101 +++++- .../kafka/listener/ContainerProperties.java | 40 ++- .../KafkaMessageListenerContainer.java | 94 ++++-- ...ultKafkaListenerObservationConvention.java | 47 +++ ...ultKafkaTemplateObservationConvention.java | 47 +++ .../micrometer/KafkaListenerObservation.java | 75 +++++ .../KafkaListenerObservationConvention.java | 41 +++ .../KafkaRecordReceiverContext.java | 64 ++++ .../micrometer/KafkaRecordSenderContext.java | 57 ++++ .../micrometer/KafkaTemplateObservation.java | 75 +++++ .../KafkaTemplateObservationConvention.java | 41 +++ .../ObservationIntegrationTests.java | 182 +++++++++++ .../support/micrometer/ObservationTests.java | 294 ++++++++++++++++++ 16 files changed, 1133 insertions(+), 49 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaListenerObservationConvention.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaTemplateObservationConvention.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservationConvention.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservationConvention.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java diff --git a/build.gradle b/build.gradle index b44f76876a..b974f95891 100644 --- a/build.gradle +++ b/build.gradle @@ -335,12 +335,14 @@ project ('spring-kafka') { optionalApi 'io.projectreactor:reactor-core' optionalApi 'io.projectreactor.kafka:reactor-kafka' optionalApi 'io.micrometer:micrometer-core' + api 'io.micrometer:micrometer-observation' optionalApi 'io.micrometer:micrometer-tracing' testImplementation project (':spring-kafka-test') testImplementation 'io.projectreactor:reactor-test' testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion" testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion" + testImplementation 'io.micrometer:micrometer-observation-test' testImplementation 'io.micrometer:micrometer-tracing-bridge-brave' testImplementation 'io.micrometer:micrometer-tracing-test' testImplementation 'io.micrometer:micrometer-tracing-integration-test' diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index b6aa57ec9a..3a12a2e2b1 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -3310,7 +3310,6 @@ IMPORTANT: By default, the application context's event multicaster invokes event If you change the multicaster to use an async executor, thread cleanup is not effective. [[micrometer]] - ==== Monitoring ===== Monitoring Listener Performance @@ -3398,6 +3397,21 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") A similar listener is provided for the `StreamsBuilderFactoryBean` - see <>. +[[observation]] +===== Micrometer Observation + +Using Micrometer for observation is now supported, since version 3.0, for the `KafkaTemplate` and listener containers. + +Set `observationEnabled` on each component to enable observation; this will disable <> because the timers will now be managed with each observation. + +Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information. + +To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively. + +The default implementations add the `bean.name` tag for template observations and `listener.id` tag for containers. + +You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations. + [[transactions]] ==== Transactions diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index dc82848e6a..caa3d28ade 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -17,6 +17,12 @@ IMPORTANT: When using transactions, the minimum broker version is 2.5. See <> and https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information. +[[x30-obs]] +==== Observation + +Enabling observation for timers and tracing using Micrometer is now supported. +See <> for more information. + [[x30-global-embedded-kafka]] ==== Global Single Embedded Kafka diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 8b9a634465..3d355af97e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -48,6 +48,8 @@ import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; @@ -62,6 +64,10 @@ import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.micrometer.DefaultKafkaTemplateObservationConvention; +import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext; +import org.springframework.kafka.support.micrometer.KafkaTemplateObservation; +import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention; import org.springframework.kafka.support.micrometer.MicrometerHolder; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -69,6 +75,9 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + /** * A template for executing high-level operations. When used with a * {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory @@ -90,7 +99,7 @@ */ @SuppressWarnings("deprecation") public class KafkaTemplate implements KafkaOperations, ApplicationContextAware, BeanNameAware, - ApplicationListener, DisposableBean { + ApplicationListener, DisposableBean, SmartInitializingSingleton { protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR @@ -126,11 +135,17 @@ public class KafkaTemplate implements KafkaOperations, ApplicationCo private ConsumerFactory consumerFactory; - private volatile boolean micrometerEnabled = true; + private ProducerInterceptor producerInterceptor; + + private boolean micrometerEnabled = true; - private volatile MicrometerHolder micrometerHolder; + private MicrometerHolder micrometerHolder; - private ProducerInterceptor producerInterceptor; + private boolean observationEnabled; + + private KafkaTemplateObservationConvention observationConvention; + + private ObservationRegistry observationRegistry; /** * Create an instance using the supplied producer factory and autoFlush false. @@ -382,6 +397,37 @@ public void setProducerInterceptor(ProducerInterceptor producerInterceptor this.producerInterceptor = producerInterceptor; } + /** + * Set to true to enable observation via Micrometer. + * @param observationEnabled true to enable. + * @since 3.0 + * @see #setMicrometerEnabled(boolean) + */ + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + + /** + * Set a custom {@link KafkaTemplateObservationConvention}. + * @param observationConvention the convention. + * @since 3.0 + */ + public void setObservationConvention(KafkaTemplateObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + + @Override + public void afterSingletonsInstantiated() { + if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) { + ObjectProvider registry = + this.applicationContext.getBeanProvider(ObservationRegistry.class); + this.observationRegistry = registry.getIfUnique(); + } + else if (this.micrometerEnabled) { + this.micrometerHolder = obtainMicrometerHolder(); + } + } + @Override public void onApplicationEvent(ContextStoppedEvent event) { if (this.customProducerFactory) { @@ -412,19 +458,19 @@ public CompletableFuture> sendDefault(Integer partition, Long t @Override public CompletableFuture> send(String topic, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, data); - return doSend(producerRecord); + return observeSend(producerRecord); } @Override public CompletableFuture> send(String topic, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, key, data); - return doSend(producerRecord); + return observeSend(producerRecord); } @Override public CompletableFuture> send(String topic, Integer partition, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, key, data); - return doSend(producerRecord); + return observeSend(producerRecord); } @Override @@ -432,13 +478,13 @@ public CompletableFuture> send(String topic, Integer partition, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data); - return doSend(producerRecord); + return observeSend(producerRecord); } @Override public CompletableFuture> send(ProducerRecord record) { Assert.notNull(record, "'record' cannot be null"); - return doSend(record); + return observeSend(record); } @SuppressWarnings("unchecked") @@ -451,7 +497,7 @@ public CompletableFuture> send(Message message) { producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId); } } - return doSend((ProducerRecord) producerRecord); + return observeSend((ProducerRecord) producerRecord); } @@ -621,20 +667,40 @@ protected void closeProducer(Producer producer, boolean inTx) { } } + private CompletableFuture> observeSend(final ProducerRecord producerRecord) { + Observation observation; + if (!this.observationEnabled || this.observationRegistry == null) { + observation = Observation.NOOP; + } + else { + observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation( + this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE, + new KafkaRecordSenderContext(producerRecord, this.beanName), this.observationRegistry); + } + try { + observation.start(); + return doSend(producerRecord, observation); + } + catch (RuntimeException ex) { + observation.error(ex); + observation.stop(); + throw ex; + } + } /** * Send the producer record. * @param producerRecord the producer record. + * @param observation the observation. * @return a Future for the {@link org.apache.kafka.clients.producer.RecordMetadata * RecordMetadata}. */ - protected CompletableFuture> doSend(final ProducerRecord producerRecord) { + protected CompletableFuture> doSend(final ProducerRecord producerRecord, + @Nullable Observation observation) { + final Producer producer = getTheProducer(producerRecord.topic()); this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord)); final CompletableFuture> future = new CompletableFuture<>(); Object sample = null; - if (this.micrometerEnabled && this.micrometerHolder == null) { - this.micrometerHolder = obtainMicrometerHolder(); - } if (this.micrometerHolder != null) { sample = this.micrometerHolder.start(); } @@ -642,7 +708,7 @@ protected CompletableFuture> doSend(final ProducerRecord this.producerInterceptor.onSend(producerRecord); } Future sendFuture = - producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)); + producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation)); // May be an immediate failure if (sendFuture.isDone()) { try { @@ -664,7 +730,7 @@ protected CompletableFuture> doSend(final ProducerRecord } private Callback buildCallback(final ProducerRecord producerRecord, final Producer producer, - final CompletableFuture> future, @Nullable Object sample) { + final CompletableFuture> future, @Nullable Object sample, Observation observation) { return (metadata, exception) -> { try { @@ -680,6 +746,7 @@ private Callback buildCallback(final ProducerRecord producerRecord, final if (sample != null) { this.micrometerHolder.success(sample); } + observation.stop(); future.complete(new SendResult<>(producerRecord, metadata)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata); @@ -691,6 +758,8 @@ private Callback buildCallback(final ProducerRecord producerRecord, final if (sample != null) { this.micrometerHolder.failure(sample, exception.getClass().getSimpleName()); } + observation.error(exception); + observation.stop(); future.completeExceptionally( new KafkaProducerException(producerRecord, "Failed to send", exception)); if (KafkaTemplate.this.producerListener != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 1c0f91893e..41816446e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -32,6 +32,7 @@ import org.springframework.aop.support.AopUtils; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -262,6 +263,8 @@ public enum EOSMode { private boolean micrometerEnabled = true; + private boolean observationEnabled; + private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT; private Boolean subBatchPerPartition; @@ -282,6 +285,8 @@ public enum EOSMode { private boolean pauseImmediate; + private KafkaListenerObservationConvention observationConvention; + /** * Create properties for a container that will subscribe to the specified topics. * @param topics the topics. @@ -635,6 +640,7 @@ public boolean isMicrometerEnabled() { /** * Set to false to disable the Micrometer listener timers. Default true. + * Disabled when {@link #setObservationEnabled(boolean)} is true. * @param micrometerEnabled false to disable. * @since 2.3 */ @@ -642,6 +648,20 @@ public void setMicrometerEnabled(boolean micrometerEnabled) { this.micrometerEnabled = micrometerEnabled; } + public boolean isObservationEnabled() { + return this.observationEnabled; + } + + /** + * Set to true to enable observation via Micrometer. + * @param observationEnabled true to enable. + * @since 3.0 + * @see #setMicrometerEnabled(boolean) + */ + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + /** * Set additional tags for the Micrometer listener timers. * @param tags the tags. @@ -912,6 +932,19 @@ private void adviseListenerIfNeeded() { } } + public KafkaListenerObservationConvention getObservationConvention() { + return this.observationConvention; + } + + /** + * Set a custom {@link KafkaListenerObservationConvention}. + * @param observationConvention the convention. + * @since 3.0 + */ + public void setObservationConvention(KafkaListenerObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + @Override public String toString() { return "ContainerProperties [" @@ -942,7 +975,12 @@ public String toString() { + "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced + "\n stopImmediate=" + this.stopImmediate + "\n asyncAcks=" + this.asyncAcks - + "\n idleBeforeDataMultiplier" + this.idleBeforeDataMultiplier + + "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier + + "\n micrometerEnabled=" + this.micrometerEnabled + + "\n observationEnabled=" + this.observationEnabled + + (this.observationConvention != null + ? "\n observationConvention=" + this.observationConvention + : "") + "\n]"; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 07656b360a..661e52d0b8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -71,6 +71,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; @@ -106,6 +107,9 @@ import org.springframework.kafka.support.LogIfLevelEnabled; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition; +import org.springframework.kafka.support.micrometer.DefaultKafkaListenerObservationConvention; +import org.springframework.kafka.support.micrometer.KafkaListenerObservation; +import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext; import org.springframework.kafka.support.micrometer.MicrometerHolder; import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; @@ -126,6 +130,9 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + /** * Single-threaded Message listener container using the Java {@link Consumer} supporting @@ -358,7 +365,14 @@ protected void doStart() { } GenericMessageListener listener = (GenericMessageListener) messageListener; ListenerType listenerType = determineListenerType(listener); - this.listenerConsumer = new ListenerConsumer(listener, listenerType); + ObservationRegistry observationRegistry = null; + ApplicationContext applicationContext = getApplicationContext(); + if (applicationContext != null) { + ObjectProvider registry = + applicationContext.getBeanProvider(ObservationRegistry.class); + observationRegistry = registry.getIfUnique(); + } + this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry); setRunning(true); this.startLatch = new CountDownLatch(1); this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer); @@ -759,6 +773,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean pauseImmediate = this.containerProperties.isPauseImmediate(); + private final ObservationRegistry observationRegistry; + private Map definedPartitions; private int count; @@ -799,16 +815,19 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private boolean pauseForPending; + private boolean firstPoll; + private volatile boolean consumerPaused; private volatile Thread consumerThread; private volatile long lastPoll = System.currentTimeMillis(); - private boolean firstPoll; - @SuppressWarnings(UNCHECKED) - ListenerConsumer(GenericMessageListener listener, ListenerType listenerType) { + ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, + @Nullable ObservationRegistry observationRegistry) { + + this.observationRegistry = observationRegistry; Properties consumerProperties = propertiesFromProperties(); checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory); this.autoCommit = determineAutoCommit(consumerProperties); @@ -1226,7 +1245,9 @@ protected void checkConsumer() { private MicrometerHolder obtainMicrometerHolder() { MicrometerHolder holder = null; try { - if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()) { + if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled() + && !this.containerProperties.isObservationEnabled()) { + holder = new MicrometerHolder(getApplicationContext(), getBeanName(), "spring.kafka.listener", "Kafka Listener Timer", this.containerProperties.getMicrometerTags()); @@ -2683,36 +2704,47 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord recor Iterator> iterator) { Object sample = startMicrometerSample(); - - try { - invokeOnMessage(record); - successTimer(sample); - recordInterceptAfter(record, null); + Observation observation; + if (!this.containerProperties.isObservationEnabled() || this.observationRegistry == null) { + observation = Observation.NOOP; } - catch (RuntimeException e) { - failureTimer(sample); - recordInterceptAfter(record, e); - if (this.commonErrorHandler == null) { - throw e; - } + else { + observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( + this.containerProperties.getObservationConvention(), + DefaultKafkaListenerObservationConvention.INSTANCE, + new KafkaRecordReceiverContext(record, getListenerId()), this.observationRegistry); + } + return observation.observe(() -> { try { - invokeErrorHandler(record, iterator, e); - commitOffsetsIfNeeded(record); + invokeOnMessage(record); + successTimer(sample); + recordInterceptAfter(record, null); } - catch (KafkaException ke) { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - return ke; - } - catch (RuntimeException ee) { - this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); - return ee; - } - catch (Error er) { // NOSONAR - this.logger.error(er, "Error handler threw an error"); - throw er; + catch (RuntimeException e) { + failureTimer(sample); + recordInterceptAfter(record, e); + if (this.commonErrorHandler == null) { + throw e; + } + try { + invokeErrorHandler(record, iterator, e); + commitOffsetsIfNeeded(record); + } + catch (KafkaException ke) { + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); + return ke; + } + catch (RuntimeException ee) { + this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); + return ee; + } + catch (Error er) { // NOSONAR + this.logger.error(er, "Error handler threw an error"); + throw er; + } } - } - return null; + return null; + }); } private void commitOffsetsIfNeeded(final ConsumerRecord record) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaListenerObservationConvention.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaListenerObservationConvention.java new file mode 100644 index 0000000000..8ac3a6292b --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaListenerObservationConvention.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.common.KeyValues; + +/** + * Default {@link KafkaListenerObservationConvention} for Kafka listener key values. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class DefaultKafkaListenerObservationConvention implements KafkaListenerObservationConvention { + + /** + * A singleton instance of the convention. + */ + public static final DefaultKafkaListenerObservationConvention INSTANCE = + new DefaultKafkaListenerObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) { + return KeyValues.of(KafkaListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(), + context.getListenerId()); + } + + @Override + public String getContextualName(KafkaRecordReceiverContext context) { + return context.getSource() + " receive"; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaTemplateObservationConvention.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaTemplateObservationConvention.java new file mode 100644 index 0000000000..b5557f9090 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaTemplateObservationConvention.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.common.KeyValues; + +/** + * Default {@link KafkaTemplateObservationConvention} for Kafka template key values. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class DefaultKafkaTemplateObservationConvention implements KafkaTemplateObservationConvention { + + /** + * A singleton instance of the convention. + */ + public static final DefaultKafkaTemplateObservationConvention INSTANCE = + new DefaultKafkaTemplateObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { + return KeyValues.of(KafkaTemplateObservation.TemplateLowCardinalityTags.BEAN_NAME.asString(), + context.getBeanName()); + } + + @Override + public String getContextualName(KafkaRecordSenderContext context) { + return context.getDestination() + " send"; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java new file mode 100644 index 0000000000..5b572c9a99 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java @@ -0,0 +1,75 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.DocumentedObservation; + +/** + * Spring for Apache Kafka Observation for listeners. + * + * @author Gary Russell + * @since 3.0 + * + */ +public enum KafkaListenerObservation implements DocumentedObservation { + + /** + * Observation for Kafka listeners. + */ + LISTENER_OBSERVATION { + + + @Override + public Class> getDefaultConvention() { + return DefaultKafkaListenerObservationConvention.class; + } + + @Override + public String getPrefix() { + return "spring.kafka.listener"; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return ListenerLowCardinalityTags.values(); + } + + }; + + /** + * Low cardinality tags. + */ + public enum ListenerLowCardinalityTags implements KeyName { + + /** + * Listener id. + */ + LISTENER_ID { + + @Override + public String asString() { + return "spring.kafka.listener.id"; + } + + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservationConvention.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservationConvention.java new file mode 100644 index 0000000000..630d259dc7 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservationConvention.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for Kafka listener key values. + * + * @author Gary Russell + * @since 3.0 + * + */ +public interface KafkaListenerObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof KafkaRecordReceiverContext; + } + + @Override + default String getName() { + return "spring.kafka.listener"; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java new file mode 100644 index 0000000000..b32633f5e8 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; + +import io.micrometer.observation.transport.ReceiverContext; + +/** + * {@link ReceiverContext} for {@link ConsumerRecord}s. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class KafkaRecordReceiverContext extends ReceiverContext> { + + private final String listenerId; + + private final ConsumerRecord record; + + public KafkaRecordReceiverContext(ConsumerRecord record, String listenerId) { + super((carrier, key) -> { + Header header = carrier.headers().lastHeader(key); + if (header == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + }); + setCarrier(record); + this.record = record; + this.listenerId = listenerId; + } + + public String getListenerId() { + return this.listenerId; + } + + /** + * Return the source topic. + * @return the source. + */ + public String getSource() { + return this.record.topic(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java new file mode 100644 index 0000000000..8598bc9d25 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.micrometer.observation.transport.SenderContext; + +/** + * {@link SenderContext} for {@link ProducerRecord}s. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class KafkaRecordSenderContext extends SenderContext> { + + private final String beanName; + + private final String destination; + + public KafkaRecordSenderContext(ProducerRecord record, String beanName) { + super((carrier, key, value) -> record.headers().add(key, value.getBytes(StandardCharsets.UTF_8))); + setCarrier(record); + this.beanName = beanName; + this.destination = record.topic(); + } + + public String getBeanName() { + return this.beanName; + } + + /** + * Return the destination topic. + * @return the topic. + */ + public String getDestination() { + return this.destination; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java new file mode 100644 index 0000000000..332dbb8326 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java @@ -0,0 +1,75 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.DocumentedObservation; + +/** + * Spring for Apache Kafka Observation for + * {@link org.springframework.kafka.core.KafkaTemplate}. + * + * @author Gary Russell + * @since 3.0 + * + */ +public enum KafkaTemplateObservation implements DocumentedObservation { + + /** + * {@link org.springframework.kafka.core.KafkaTemplate} observation. + */ + TEMPLATE_OBSERVATION { + + @Override + public Class> getDefaultConvention() { + return DefaultKafkaTemplateObservationConvention.class; + } + + @Override + public String getPrefix() { + return "spring.kafka.template"; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return TemplateLowCardinalityTags.values(); + } + + }; + + /** + * Low cardinality tags. + */ + public enum TemplateLowCardinalityTags implements KeyName { + + /** + * Bean name of the template. + */ + BEAN_NAME { + + @Override + public String asString() { + return "spring.kafka.template.name"; + } + + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservationConvention.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservationConvention.java new file mode 100644 index 0000000000..8b0c540875 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservationConvention.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for Kafka template key values. + * + * @author Gary Russell + * @since 3.0 + * + */ +public interface KafkaTemplateObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Context context) { + return context instanceof KafkaRecordSenderContext; + } + + @Override + default String getName() { + return "spring.kafka.template"; + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java new file mode 100644 index 0000000000..28d196efd2 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java @@ -0,0 +1,182 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import io.micrometer.common.KeyValues; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.Span.Kind; +import io.micrometer.tracing.exporter.FinishedSpan; +import io.micrometer.tracing.test.SampleTestRunner; +import io.micrometer.tracing.test.simple.SpanAssert; +import io.micrometer.tracing.test.simple.SpansAssert; + +/** + * @author Artem Bilan + * @author Gary Russell + * + * @since 3.0 + */ +public class ObservationIntegrationTests extends SampleTestRunner { + + @SuppressWarnings("unchecked") + @Override + public SampleTestRunnerConsumer yourCode() { + // template -> listener -> template -> listener + return (bb, meterRegistry) -> { + ObservationRegistry observationRegistry = getObservationRegistry(); + try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) { + applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry); + applicationContext.register(Config.class); + applicationContext.refresh(); + applicationContext.getBean(KafkaTemplate.class).send("int.observation.testT1", "test"); + assertThat(applicationContext.getBean(Listener.class).latch1.await(10, TimeUnit.SECONDS)).isTrue(); + } + + List finishedSpans = bb.getFinishedSpans(); + SpansAssert.assertThat(finishedSpans) + .haveSameTraceId() + .hasSize(4); + List producerSpans = finishedSpans.stream() + .filter(span -> span.getKind().equals(Kind.PRODUCER)) + .collect(Collectors.toList()); + List consumerSpans = finishedSpans.stream() + .filter(span -> span.getKind().equals(Kind.CONSUMER)) + .collect(Collectors.toList()); + SpanAssert.assertThat(producerSpans.get(0)) + .hasTag("spring.kafka.template.name", "template"); + SpanAssert.assertThat(producerSpans.get(1)) + .hasTag("spring.kafka.template.name", "template"); + SpanAssert.assertThat(consumerSpans.get(0)) + .hasTagWithKey("spring.kafka.listener.id"); + assertThat(consumerSpans.get(0).getTags().get("spring.kafka.listener.id")).isIn("obs1-0", "obs2-0"); + SpanAssert.assertThat(consumerSpans.get(1)) + .hasTagWithKey("spring.kafka.listener.id"); + assertThat(consumerSpans.get(1).getTags().get("spring.kafka.listener.id")).isIn("obs1-0", "obs2-0"); + assertThat(consumerSpans.get(0).getTags().get("spring.kafka.listener.id")) + .isNotEqualTo(consumerSpans.get(1).getTags().get("spring.kafka.listener.id")); + + MeterRegistryAssert.assertThat(getMeterRegistry()) + .hasTimerWithNameAndTags("spring.kafka.template", + KeyValues.of("spring.kafka.template.name", "template")) + .hasTimerWithNameAndTags("spring.kafka.template", + KeyValues.of("spring.kafka.template.name", "template")) + .hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of("spring.kafka.listener.id", "obs1-0")) + .hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of("spring.kafka.listener.id", "obs2-0")); + }; + } + + + @Configuration + @EnableKafka + public static class Config { + + @Bean + EmbeddedKafkaBroker broker() { + return new EmbeddedKafkaBroker(1, true, 1, "int.observation.testT1", "int.observation.testT2"); + } + + @Bean + ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + return new DefaultKafkaProducerFactory<>(producerProps); + } + + @Bean + ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); + return new DefaultKafkaConsumerFactory<>(consumerProps); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory cf) { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setObservationEnabled(true); + return factory; + } + + @Bean + Listener listener(KafkaTemplate template) { + return new Listener(template); + } + + } + + public static class Listener { + + private final KafkaTemplate template; + + final CountDownLatch latch1 = new CountDownLatch(1); + + volatile ConsumerRecord record; + + public Listener(KafkaTemplate template) { + this.template = template; + } + + @KafkaListener(id = "obs1", topics = "int.observation.testT1") + void listen1(ConsumerRecord in) { + this.template.send("int.observation.testT2", in.value()); + } + + @KafkaListener(id = "obs2", topics = "int.observation.testT2") + void listen2(ConsumerRecord in) { + this.record = in; + this.latch1.countDown(); + } + + } + + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java new file mode 100644 index 0000000000..962a0a5ac2 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -0,0 +1,294 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.Arrays; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.lang.Nullable; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SimpleSpan; +import io.micrometer.tracing.test.simple.SimpleTracer; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SpringJUnitConfig +@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2" }) +@DirtiesContext +public class ObservationTests { + + @Test + void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, + @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, + @Autowired MeterRegistry meterRegistry) + throws InterruptedException, ExecutionException, TimeoutException { + + template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); + assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.record).isNotNull(); + Headers headers = listener.record.headers(); + assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(4); + SimpleSpan span = spans.poll(); + assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); + assertThat(span.getName()).isEqualTo("observation.testT1 send"); + span = spans.poll(); + assertThat(span.getTags()) + .containsAllEntriesOf( + Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", "some bar value")); + assertThat(span.getName()).isEqualTo("observation.testT1 receive"); + await().until(() -> spans.peekFirst().getTags().size() == 1); + span = spans.poll(); + assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); + assertThat(span.getName()).isEqualTo("observation.testT2 send"); + await().until(() -> spans.peekFirst().getTags().size() == 3); + span = spans.poll(); + assertThat(span.getTags()) + .containsAllEntriesOf( + Map.of("spring.kafka.listener.id", "obs2-0", "foo", "some foo value", "bar", "some bar value")); + assertThat(span.getName()).isEqualTo("observation.testT2 receive"); + template.setObservationConvention(new DefaultKafkaTemplateObservationConvention() { + + @Override + public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { + return super.getLowCardinalityKeyValues(context).and("foo", "bar"); + } + + }); + rler.getListenerContainer("obs1").getContainerProperties().setObservationConvention( + new DefaultKafkaListenerObservationConvention() { + + @Override + public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) { + return super.getLowCardinalityKeyValues(context).and("baz", "qux"); + } + + }); + rler.getListenerContainer("obs1").stop(); + rler.getListenerContainer("obs1").start(); + template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); + assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.record).isNotNull(); + headers = listener.record.headers(); + assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(spans).hasSize(4); + span = spans.poll(); + assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); + assertThat(span.getTags()).containsEntry("foo", "bar"); + assertThat(span.getName()).isEqualTo("observation.testT1 send"); + await().until(() -> spans.peekFirst().getTags().size() == 4); + span = spans.poll(); + assertThat(span.getTags()) + .containsAllEntriesOf(Map.of("spring.kafka.listener.id", "obs1-0", "foo", "some foo value", "bar", + "some bar value", "baz", "qux")); + assertThat(span.getName()).isEqualTo("observation.testT1 receive"); + await().until(() -> spans.peekFirst().getTags().size() == 2); + span = spans.poll(); + assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); + assertThat(span.getTags()).containsEntry("foo", "bar"); + assertThat(span.getName()).isEqualTo("observation.testT2 send"); + await().until(() -> spans.peekFirst().getTags().size() == 3); + span = spans.poll(); + assertThat(span.getTags()) + .containsAllEntriesOf( + Map.of("spring.kafka.listener.id", "obs2-0", "foo", "some foo value", "bar", "some bar value")); + assertThat(span.getTags()).doesNotContainEntry("baz", "qux"); + assertThat(span.getName()).isEqualTo("observation.testT2 receive"); + MeterRegistryAssert.assertThat(meterRegistry) + .hasTimerWithNameAndTags("spring.kafka.template", + KeyValues.of("spring.kafka.template.name", "template")) + .hasTimerWithNameAndTags("spring.kafka.template", + KeyValues.of("spring.kafka.template.name", "template", "foo", "bar")) + .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs1-0")) + .hasTimerWithNameAndTags("spring.kafka.listener", + KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux")) + .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0")); + } + + @Configuration + @EnableKafka + public static class Config { + + @Bean + ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + Map producerProps = KafkaTestUtils.producerProps(broker); + return new DefaultKafkaProducerFactory<>(producerProps); + } + + @Bean + ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); + return new DefaultKafkaConsumerFactory<>(consumerProps); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory cf) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cf); + factory.getContainerProperties().setObservationEnabled(true); + return factory; + } + + @Bean + SimpleTracer simpleTracer() { + return new SimpleTracer(); + } + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) { + TestObservationRegistry observationRegistry = TestObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler( + // Composite will pick the first matching handler + new ObservationHandler.FirstMatchingCompositeObservationHandler( + // This is responsible for creating a child span on the sender side + new PropagatingSenderTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a span on the receiver side + new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a default span + new DefaultTracingObservationHandler(tracer))) + .observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + return observationRegistry; + } + + @Bean + Propagator propagator(Tracer tracer) { + return new Propagator() { + + // List of headers required for tracing propagation + @Override + public List fields() { + return Arrays.asList("foo", "bar"); + } + + // This is called on the producer side when the message is being sent + // Normally we would pass information from tracing context - for tests we don't need to + @Override + public void inject(TraceContext context, @Nullable C carrier, Setter setter) { + setter.set(carrier, "foo", "some foo value"); + setter.set(carrier, "bar", "some bar value"); + } + + // This is called on the consumer side when the message is consumed + // Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span + @Override + public Span.Builder extract(C carrier, Getter getter) { + String foo = getter.get(carrier, "foo"); + String bar = getter.get(carrier, "bar"); + return tracer.spanBuilder().tag("foo", foo).tag("bar", bar); + } + }; + } + + @Bean + Listener listener(KafkaTemplate template) { + return new Listener(template); + } + + } + + public static class Listener { + + private final KafkaTemplate template; + + final CountDownLatch latch1 = new CountDownLatch(1); + + final CountDownLatch latch2 = new CountDownLatch(2); + + volatile ConsumerRecord record; + + public Listener(KafkaTemplate template) { + this.template = template; + } + + @KafkaListener(id = "obs1", topics = "observation.testT1") + void listen1(ConsumerRecord in) { + this.template.send("observation.testT2", in.value()); + } + + @KafkaListener(id = "obs2", topics = "observation.testT2") + void listen2(ConsumerRecord in) { + this.record = in; + this.latch1.countDown(); + this.latch2.countDown(); + } + + } + +}