Skip to content

Commit

Permalink
Implement vertx-kafka-client instrumentation; single record handler (#…
Browse files Browse the repository at this point in the history
…5973)

* Implement vertx-kafka-client instrumentation; single record

* add muzzle

* Update baseline to 3.6

* Set baseline to 3.6 everywhere

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
Mateusz Rzeszutek and trask committed May 5, 2022
1 parent f2ed9bf commit 2a77003
Show file tree
Hide file tree
Showing 12 changed files with 723 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public ConsumerRecord<K, V> next() {
closeScopeAndEndSpan();

ConsumerRecord<K, V> next = delegateIterator.next();
if (next != null && consumerProcessInstrumenter().shouldStart(parentContext, next)) {
if (next != null
&& consumerProcessInstrumenter().shouldStart(parentContext, next)
&& KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
currentRequest = next;
currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest);
currentScope = currentContext.makeCurrent();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("io.vertx")
module.set("vertx-kafka-client")
versions.set("[3.5.1,)")
assertInverse.set(true)
}
}

dependencies {
bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))

library("io.vertx:vertx-kafka-client:3.6.0")
// vertx-codegen is needed for Xlint's annotation checking
library("io.vertx:vertx-codegen:3.6.0")

testImplementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing"))

testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
}

testing {
suites {
val testNoReceiveTelemetry by registering(JvmTestSuite::class) {
dependencies {
implementation("io.vertx:vertx-kafka-client:3.6.0")
implementation("io.vertx:vertx-codegen:3.6.0")
implementation(project(":instrumentation:vertx:vertx-kafka-client-3.6:testing"))
}

targets {
all {
testTask.configure {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
}
}
}
}
}
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testing.suites)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6.VertxKafkaSingletons.processInstrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.vertx.core.Handler;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class InstrumentedSingleRecordHandler<K, V> implements Handler<ConsumerRecord<K, V>> {

private final VirtualField<ConsumerRecord<K, V>, Context> receiveContextField;
@Nullable private final Handler<ConsumerRecord<K, V>> delegate;

public InstrumentedSingleRecordHandler(
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField,
@Nullable Handler<ConsumerRecord<K, V>> delegate) {
this.receiveContextField = receiveContextField;
this.delegate = delegate;
}

@Override
public void handle(ConsumerRecord<K, V> record) {
Context parentContext = getParentContext(record);

if (!processInstrumenter().shouldStart(parentContext, record)) {
callDelegateHandler(record);
return;
}

Context context = processInstrumenter().start(parentContext, record);
Throwable error = null;
try (Scope ignored = context.makeCurrent()) {
callDelegateHandler(record);
} catch (Throwable t) {
error = t;
throw t;
} finally {
processInstrumenter().end(context, record, null, error);
}
}

private Context getParentContext(ConsumerRecord<K, V> records) {
Context receiveContext = receiveContextField.get(records);

// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
}

private void callDelegateHandler(ConsumerRecord<K, V> record) {
if (delegate != null) {
delegate.handle(record);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KafkaConsumerRecordsImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("recordAt")
.and(isPublic())
.and(takesArguments(int.class))
.and(returns(named("io.vertx.kafka.client.consumer.KafkaConsumerRecord"))),
this.getClass().getName() + "$RecordAtAdvice");
}

@SuppressWarnings("unused")
public static class RecordAtAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setEnabled(false);
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.vertx.core.Handler;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaReadStreamImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("handler")
.and(takesArguments(1))
.and(takesArgument(0, named("io.vertx.core.Handler"))),
this.getClass().getName() + "$HandlerAdvice");
transformer.applyAdviceToMethod(
named("batchHandler")
.and(takesArguments(1))
.and(takesArgument(0, named("io.vertx.core.Handler"))),
this.getClass().getName() + "$BatchHandlerAdvice");
transformer.applyAdviceToMethod(
named("run").and(isPrivate()), this.getClass().getName() + "$RunAdvice");
}

@SuppressWarnings("unused")
public static class HandlerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static <K, V> void onEnter(
@Advice.Argument(value = 0, readOnly = false) Handler<ConsumerRecord<K, V>> handler) {

VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecord.class, Context.class);
handler = new InstrumentedSingleRecordHandler<>(receiveContextField, handler);
}
}

@SuppressWarnings("unused")
public static class BatchHandlerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static <K, V> void onEnter(
@Advice.Argument(value = 0, readOnly = false) Handler<ConsumerRecords<K, V>> handler) {
// TODO: next PR
}
}

// this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
@SuppressWarnings("unused")
public static class RunAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setEnabled(false);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class VertxKafkaInstrumentationModule extends InstrumentationModule {

public VertxKafkaInstrumentationModule() {
super("vertx-kafka-client", "vertx-kafka-client-3.5", "vertx");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaReadStreamImplInstrumentation(), new KafkaConsumerRecordsImplInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class VertxKafkaSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.5";

private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.createConsumerProcessInstrumenter();

public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
return PROCESS_INSTRUMENTER;
}

private VertxKafkaSingletons() {}
}
Loading

0 comments on commit 2a77003

Please sign in to comment.