Skip to content

Commit

Permalink
Bigtable: integrate OpenCensus tracing into the bigtable data client (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 committed Feb 26, 2019
1 parent 9ef0c0e commit 41a04ee
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 32 deletions.
60 changes: 60 additions & 0 deletions google-cloud-clients/google-cloud-bigtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,66 @@ try {
}
```

## Opencensus Tracing

Cloud Bigtable client supports [Opencensus Tracing](https://opencensus.io/tracing/),
which gives insight into the client internals and aids in debugging production issues.
By default, the functionality is disabled. To enable, you need to add a couple of
dependencies and configure an exporter. For example to enable tracing using
[Google Stackdriver](https://cloud.google.com/trace/docs/):

[//]: # (TODO: figure out how to keep opencensus version in sync with pom.xml)

If you are using Maven, add this to your pom.xml file
```xml
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<version>0.18.0</version>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-exporter-trace-stackdriver</artifactId>
<version>0.18.0</version>
</dependency>
```
If you are using Gradle, add this to your dependencies
```Groovy
compile 'io.opencensus:opencensus-impl:0.18.0'
compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.18.0'
```
If you are using SBT, add this to your dependencies
```Scala
libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.18.0"
libraryDependencies += "io.opencensus" % "opencensus-exporter-trace-stackdriver" % "0.18.0"
```

Then at the start of your application configure the exporter:

```java
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;

StackdriverTraceExporter.createAndRegister(
StackdriverTraceConfiguration.builder()
.setProjectId("YOUR-PROJECT_ID")
.build());
```

By default traces are [sampled](https://opencensus.io/tracing/sampling) at a rate of about 1/10,000.
You can configure a higher rate by updating the active tracing params:

```java
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;

Tracing.getTraceConfig().updateActiveTraceParams(
Tracing.getTraceConfig().getActiveTraceParams().toBuilder()
.setSampler(Samplers.probabilitySampler(0.01))
.build()
);
```

## Troubleshooting

To get help, follow the instructions in the [shared Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.BatchingCallSettings;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedBatchingCallable;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.SampleRowKeysRequest;
Expand All @@ -50,6 +54,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import java.io.IOException;
import java.util.List;
import org.threeten.bp.Duration;
Expand All @@ -68,6 +73,9 @@
*/
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable";
private static final String TRACING_INNER_CLIENT_NAME = "BaseBigtable";

private final EnhancedBigtableStubSettings settings;
private final GrpcBigtableStub stub;
private final ClientContext clientContext;
Expand All @@ -92,7 +100,10 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
.setCredentialsProvider(settings.getCredentialsProvider())
.setHeaderProvider(settings.getHeaderProvider())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval());
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
// Force the base stub to use a different TracerFactory
.setTracerFactory(
new WrappedTracerFactory(settings.getTracerFactory(), TRACING_INNER_CLIENT_NAME));

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings).
Expand Down Expand Up @@ -140,6 +151,9 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigtableStub stub = new GrpcBigtableStub(baseSettings, clientContext);

// Make sure to keep the original tracer factory for the outer client.
clientContext = clientContext.toBuilder().setTracerFactory(settings.getTracerFactory()).build();

return new EnhancedBigtableStub(settings, clientContext, stub);
}

Expand Down Expand Up @@ -247,15 +261,8 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
FilterMarkerRowsCallable<RowT> filtering =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());

// NOTE: Ideally `withDefaultCallContext` should be the outer-most callable, however the
// ReadRowsUserCallable overrides the first() method. This override would be lost if
// ReadRowsUserCallable is wrapped by another callable. At some point in the future,
// gax-java should allow preserving these kind of overrides through callable chains, at which
// point this should be re-ordered.
return new ReadRowsUserCallable<>(withContext, requestContext);
return createUserFacingServerStreamingCallable(
"ReadRows", new ReadRowsUserCallable<>(filtering, requestContext));
}

/**
Expand All @@ -276,10 +283,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);

UnaryCallable<String, List<KeyOffset>> userFacing =
new SampleRowKeysCallable(retryable, requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
}

/**
Expand All @@ -290,9 +295,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
MutateRowCallable userFacing = new MutateRowCallable(stub.mutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(stub.mutateRowCallable(), requestContext));
}

/**
Expand All @@ -311,7 +315,9 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
return new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

return createUserFacingUnaryCallable(
"BulkMutateRows", new BulkMutateRowsUserFacingCallable(baseCallable, requestContext));
}

/**
Expand All @@ -338,8 +344,17 @@ private UnaryCallable<RowMutation, Void> createBulkMutateRowsBatchingCallable()
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
.setBatchingSettings(settings.bulkMutateRowsSettings().getBatchingSettings());

// This is a special case, the tracing starts after the batching, so we can't use
// createUserFacingUnaryCallable
TracedBatchingCallable<MutateRowsRequest, Void> traced =
new TracedBatchingCallable<>(
baseCallable,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "BulkMutateRows"),
batchingCallSettings.getBatchingDescriptor());

UnaryCallable<MutateRowsRequest, Void> batching =
Callables.batching(baseCallable, batchingCallSettings.build(), clientContext);
Callables.batching(traced, batchingCallSettings.build(), clientContext);

MutateRowsUserFacingCallable userFacing =
new MutateRowsUserFacingCallable(batching, requestContext);
Expand All @@ -359,7 +374,7 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryingExecutor<Void> retryingExecutor =
RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new MutateRowsRetryingCallable(
Expand All @@ -378,10 +393,9 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
CheckAndMutateRowCallable userFacing =
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"CheckAndMutateRow",
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext));
}

/**
Expand All @@ -394,10 +408,42 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
* </ul>
*/
private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable() {
ReadModifyWriteRowCallable userFacing =
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext);
return createUserFacingUnaryCallable(
"ReadModifyWriteRow",
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext));
}

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
*/
private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacingUnaryCallable(
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
*/
private <RequestT, ResponseT>
ServerStreamingCallable<RequestT, ResponseT> createUserFacingServerStreamingCallable(
String methodName, ServerStreamingCallable<RequestT, ResponseT> inner) {

ServerStreamingCallable<RequestT, ResponseT> traced =
new TracedServerStreamingCallable<>(
inner,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.BatchingCallSettings;
Expand All @@ -28,6 +30,7 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.DummyBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -37,6 +40,7 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -268,6 +272,13 @@ private Builder() {
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

setTracerFactory(
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
readRowsSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public Void call() {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

// Make the actual call
ApiFuture<List<MutateRowsResponse>> innerFuture =
innerCallable.futureCall(currentRequest, currentCallContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -42,13 +42,13 @@
public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest, Void> {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutor<Void> executor;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutor<Void> executor,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
Expand All @@ -62,7 +62,7 @@ public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable);
RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();

Expand Down
Loading

0 comments on commit 41a04ee

Please sign in to comment.