Skip to content

Commit

Permalink
GH-2198: Spring Observability Initial Commit (#2394)
Browse files Browse the repository at this point in the history
* GH-2198: Spring Observability Initial Commit

Resolves #2198

* Move getContextualContext to conventions.

* Fix generics in test.

* Add docs.

* Fix doc link.

* Remove unnecessary method overrides; make tag names more descriptive.

* Async stop for send.

* Fix checkstyle.

* Fix async stop - don't stop on sync success; change order of spans for test.

* Fix generics in test.

* Fix checkstyle.

* Fix race in test; with async send spans, finished spans order is indeterminate.

* Move getName() from context to convention.

* Fix Race in Test

Fix Race in Test.

Fix Race in Test.

Fix Race in Test.

Fix Race in Test.
  • Loading branch information
garyrussell committed Sep 19, 2022
1 parent 1b9716d commit 182c580
Show file tree
Hide file tree
Showing 16 changed files with 1,133 additions and 49 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,14 @@ project ('spring-kafka') {
optionalApi 'io.projectreactor:reactor-core'
optionalApi 'io.projectreactor.kafka:reactor-kafka'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'

testImplementation project (':spring-kafka-test')
testImplementation 'io.projectreactor:reactor-test'
testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion"
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
Expand Down
16 changes: 15 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,6 @@ IMPORTANT: By default, the application context's event multicaster invokes event
If you change the multicaster to use an async executor, thread cleanup is not effective.

[[micrometer]]

==== Monitoring

===== Monitoring Listener Performance
Expand Down Expand Up @@ -3398,6 +3397,21 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")

A similar listener is provided for the `StreamsBuilderFactoryBean` - see <<streams-micrometer>>.

[[observation]]
===== Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0, for the `KafkaTemplate` and listener containers.

Set `observationEnabled` on each component to enable observation; this will disable <<micrometer,Micrometer Timers>> because the timers will now be managed with each observation.

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.

The default implementations add the `bean.name` tag for template observations and `listener.id` tag for containers.

You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.

[[transactions]]
==== Transactions

Expand Down
6 changes: 6 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ IMPORTANT: When using transactions, the minimum broker version is 2.5.

See <<exactly-once>> and https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.

[[x30-obs]]
==== Observation

Enabling observation for timers and tracing using Micrometer is now supported.
See <<observation>> for more information.

[[x30-global-embedded-kafka]]
==== Global Single Embedded Kafka

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
Expand All @@ -62,13 +64,20 @@
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* A template for executing high-level operations. When used with a
* {@link DefaultKafkaProducerFactory}, the template is thread-safe. The producer factory
Expand All @@ -90,7 +99,7 @@
*/
@SuppressWarnings("deprecation")
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean {
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); //NOSONAR

Expand Down Expand Up @@ -126,11 +135,17 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private ConsumerFactory<K, V> consumerFactory;

private volatile boolean micrometerEnabled = true;
private ProducerInterceptor<K, V> producerInterceptor;

private boolean micrometerEnabled = true;

private volatile MicrometerHolder micrometerHolder;
private MicrometerHolder micrometerHolder;

private ProducerInterceptor<K, V> producerInterceptor;
private boolean observationEnabled;

private KafkaTemplateObservationConvention observationConvention;

private ObservationRegistry observationRegistry;

/**
* Create an instance using the supplied producer factory and autoFlush false.
Expand Down Expand Up @@ -382,6 +397,37 @@ public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor
this.producerInterceptor = producerInterceptor;
}

/**
* Set to true to enable observation via Micrometer.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set a custom {@link KafkaTemplateObservationConvention}.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(KafkaTemplateObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public void afterSingletonsInstantiated() {
if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) {
ObjectProvider<ObservationRegistry> registry =
this.applicationContext.getBeanProvider(ObservationRegistry.class);
this.observationRegistry = registry.getIfUnique();
}
else if (this.micrometerEnabled) {
this.micrometerHolder = obtainMicrometerHolder();
}
}

@Override
public void onApplicationEvent(ContextStoppedEvent event) {
if (this.customProducerFactory) {
Expand Down Expand Up @@ -412,33 +458,33 @@ public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long t
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data) {

ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
return doSend(producerRecord);
return observeSend(producerRecord);
}

@Override
public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
Assert.notNull(record, "'record' cannot be null");
return doSend(record);
return observeSend(record);
}

@SuppressWarnings("unchecked")
Expand All @@ -451,7 +497,7 @@ public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return doSend((ProducerRecord<K, V>) producerRecord);
return observeSend((ProducerRecord<K, V>) producerRecord);
}


Expand Down Expand Up @@ -621,28 +667,48 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
}
}

private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K, V> producerRecord) {
Observation observation;
if (!this.observationEnabled || this.observationRegistry == null) {
observation = Observation.NOOP;
}
else {
observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE,
new KafkaRecordSenderContext(producerRecord, this.beanName), this.observationRegistry);
}
try {
observation.start();
return doSend(producerRecord, observation);
}
catch (RuntimeException ex) {
observation.error(ex);
observation.stop();
throw ex;
}
}
/**
* Send the producer record.
* @param producerRecord the producer record.
* @param observation the observation.
* @return a Future for the {@link org.apache.kafka.clients.producer.RecordMetadata
* RecordMetadata}.
*/
protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord,
@Nullable Observation observation) {

final Producer<K, V> producer = getTheProducer(producerRecord.topic());
this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
final CompletableFuture<SendResult<K, V>> future = new CompletableFuture<>();
Object sample = null;
if (this.micrometerEnabled && this.micrometerHolder == null) {
this.micrometerHolder = obtainMicrometerHolder();
}
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
if (this.producerInterceptor != null) {
this.producerInterceptor.onSend(producerRecord);
}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation));
// May be an immediate failure
if (sendFuture.isDone()) {
try {
Expand All @@ -664,7 +730,7 @@ protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V>
}

private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample) {
final CompletableFuture<SendResult<K, V>> future, @Nullable Object sample, Observation observation) {

return (metadata, exception) -> {
try {
Expand All @@ -680,6 +746,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
if (sample != null) {
this.micrometerHolder.success(sample);
}
observation.stop();
future.complete(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
Expand All @@ -691,6 +758,8 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
if (sample != null) {
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
}
observation.error(exception);
observation.stop();
future.completeExceptionally(
new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -262,6 +263,8 @@ public enum EOSMode {

private boolean micrometerEnabled = true;

private boolean observationEnabled;

private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;

private Boolean subBatchPerPartition;
Expand All @@ -282,6 +285,8 @@ public enum EOSMode {

private boolean pauseImmediate;

private KafkaListenerObservationConvention observationConvention;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -635,13 +640,28 @@ public boolean isMicrometerEnabled() {

/**
* Set to false to disable the Micrometer listener timers. Default true.
* Disabled when {@link #setObservationEnabled(boolean)} is true.
* @param micrometerEnabled false to disable.
* @since 2.3
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

public boolean isObservationEnabled() {
return this.observationEnabled;
}

/**
* Set to true to enable observation via Micrometer.
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set additional tags for the Micrometer listener timers.
* @param tags the tags.
Expand Down Expand Up @@ -912,6 +932,19 @@ private void adviseListenerIfNeeded() {
}
}

public KafkaListenerObservationConvention getObservationConvention() {
return this.observationConvention;
}

/**
* Set a custom {@link KafkaListenerObservationConvention}.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(KafkaListenerObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand Down Expand Up @@ -942,7 +975,12 @@ public String toString() {
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n idleBeforeDataMultiplier" + this.idleBeforeDataMultiplier
+ "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier
+ "\n micrometerEnabled=" + this.micrometerEnabled
+ "\n observationEnabled=" + this.observationEnabled
+ (this.observationConvention != null
? "\n observationConvention=" + this.observationConvention
: "")
+ "\n]";
}

Expand Down
Loading

0 comments on commit 182c580

Please sign in to comment.