Add decorators for Kafka Streams to enable tracing.
TracingKafkaClientSupplier
a client supplier which traces poll and send operations.TracingProcessorSupplier
completes a span onprocess
TracingTransformerSupplier
completes a span ontransform
This does not trace all operations by default. See [RATIONALE.md] for why.
First, setup the generic Kafka Streams component like this:
import brave.kafka.streams.KafkaStreamsTracing;
...
kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
To trace a processor in your application use TracingProcessorSupplier
, provided by instrumentation API:
builder.stream(inputTopic)
.processor(kafkaStreamsTracing.processor(
"process",
customProcessor));
To trace a transformer, use TracingTransformerSupplier
, TracingValueTransformerSupplier
, or TracingValueTransformerWithValueSupplier
provided by instrumentation API:
builder.stream(inputTopic)
.transform(kafkaStreamsTracing.transformer(
"transformer-1",
customTransformer))
.to(outputTopic);
builder.stream(inputTopic)
.transformValue(kafkaStreamsTracing.valueTransformer(
"transform-value",
customTransformer))
.to(outputTopic);
builder.stream(inputTopic)
.transformValueWithKey(kafkaStreamsTracing.valueTransformerWithKey(
"transform-value-with-key",
customTransformer))
.to(outputTopic);
Additional transformers have been introduced to cover most common Kafka Streams DSL operations (e.g. map
, mapValues
, foreach
, peek
, filter
).
builder.stream(inputTopic)
.transform(kafkaStreamsTracing.map("map", mapper))
.to(outputTopic);
For flat operations like flatMap, the flatTransform
method can be used:
builder.stream(inputTopic)
.flatTransform(kafkaStreamsTracing.flatMap("flat-map", mapper))
.to(outputTopic);
For more details, see here.
To create a Kafka Streams with Tracing Client Supplier enabled, pass your topology and configuration like this:
KafkaStreams kafkaStreams = kafkaStreamsTracing.kafkaStreams(topology, streamsConfig);
- This tracer is only compatible with Kafka Streams versions including headers support ( > 2.0.0).