Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner gapic migration error augmentation with interceptor #3304

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -52,6 +50,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.protobuf.Empty;
import com.google.protobuf.FieldMask;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
Expand Down Expand Up @@ -83,10 +82,10 @@
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
Expand All @@ -100,8 +99,6 @@
import java.util.concurrent.Future;
import javax.annotation.Nullable;

import com.google.longrunning.Operation;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
public class GapicSpannerRpc implements SpannerRpc {

Expand Down Expand Up @@ -145,9 +142,7 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());

// TODO(pongad): make RPC logging work (formerly LoggingInterceptor)
// TODO(pongad): add watchdog
// TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor)

// TODO(hzyi): make this channelProvider configurable through SpannerOptions
TransportChannelProvider channelProvider =
Expand All @@ -156,60 +151,61 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setPoolSize(options.getNumChannels())
.setInterceptorProvider(new SpannerInterceptorProvider())
.build();

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

// Disabling retry for now because spanner handles retry in SpannerImpl.
// We will finally want to improve gax but for smooth transitioning we
// preserve the retry in SpannerImpl
try {
// TODO: bump the version of gax and remove this try-catch block
// applyToAllUnaryMethods does not throw exception in the latest version
this.stub =
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());

this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/** Adds logging to rpc calls */
class LoggingInterceptor implements ClientInterceptor {

private final Logger logger;
private final Level level;

LoggingInterceptor(Logger logger, Level level) {
this.logger = logger;
this.level = level;
}

private class CallLogger {

private final MethodDescriptor<?, ?> method;

CallLogger(MethodDescriptor<?, ?> method) {
this.method = method;
}

void log(String message) {
logger.log(
level,
"{0}[{1}]: {2}",
new Object[] {
method.getFullMethodName(), Integer.toHexString(System.identityHashCode(this)), message
});
}

void logfmt(String message, Object... params) {
log(String.format(message, params));
}
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (!logger.isLoggable(level)) {
return next.newCall(method, callOptions);
}

final CallLogger callLogger = new CallLogger(method);
callLogger.log("Start");
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onMessage(RespT message) {
callLogger.logfmt("Received:\n%s", message);
super.onMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
callLogger.logfmt("Closed with status %s and trailers %s", status, trailers);
super.onClose(status, trailers);
}
},
headers);
}

@Override
public void sendMessage(ReqT message) {
callLogger.logfmt("Send:\n%s", message);
super.sendMessage(message);
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
callLogger.logfmt("Cancelled with message %s", message);
super.cancel(message, cause);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* For internal use only.
* An interceptor provider that provides a list of grpc interceptors for {@code GapicSpannerRpc}
* to handle logging and error augmentation by intercepting grpc calls.
*/
class SpannerInterceptorProvider implements GrpcInterceptorProvider {

private static final List<ClientInterceptor> clientInterceptors =
ImmutableList.of(
new SpannerErrorInterceptor(),
new LoggingInterceptor(Logger.getLogger(GrpcSpannerRpc.class.getName()), Level.FINER));

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
}

}