From f539045d0dae43378ec7bef3b8b1b541fb6578c7 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Fri, 30 Mar 2018 16:46:08 -0700 Subject: [PATCH 1/5] WIP: inject headers to grpcCallContext before making calls --- google-cloud-bom/pom.xml | 6 +- .../com/google/cloud/spanner/SpannerImpl.java | 8 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 129 ++++++++++++------ .../spi/v1/SpannerMetadataProvider.java | 18 ++- pom.xml | 2 +- 5 files changed, 114 insertions(+), 49 deletions(-) diff --git a/google-cloud-bom/pom.xml b/google-cloud-bom/pom.xml index b6672db43f90..ede55bdc89e7 100644 --- a/google-cloud-bom/pom.xml +++ b/google-cloud-bom/pom.xml @@ -170,9 +170,9 @@ 0.43.1-alpha-SNAPSHOT 1.5.0 - 1.23.0 - 1.23.0 - 0.40.0 + 1.24.0 + 1.24.0 + 0.41.0 0.8.0 1.7.0 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index b645b75e056a..7558b9cb36c3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -266,7 +266,7 @@ Session createSession(final DatabaseId db) throws SpannerException { new Callable() { @Override public com.google.spanner.v1.Session call() throws Exception { - return rawGrpcRpc.createSession( + return gapicRpc.createSession( db.getName(), getOptions().getSessionLabels(), options); } }); @@ -806,7 +806,7 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx new Callable() { @Override public CommitResponse call() throws Exception { - return rawGrpcRpc.commit(request, options); + return gapicRpc.commit(request, options); } }); Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); @@ -872,7 +872,7 @@ public void close() { new Callable() { @Override public Void call() throws Exception { - rawGrpcRpc.deleteSession(name, options); + gapicRpc.deleteSession(name, options); return null; } }); @@ -898,7 +898,7 @@ ByteString beginTransaction() { new Callable() { @Override public Transaction call() throws Exception { - return rawGrpcRpc.beginTransaction(request, options); + return gapicRpc.beginTransaction(request, options); } }); if (txn.getId().isEmpty()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 0872a84f08d9..fe484a3f95c1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -21,6 +21,7 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; @@ -68,6 +69,8 @@ import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; +import com.google.spanner.v1.CreateSessionRequest; +import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartitionQueryRequest; import com.google.spanner.v1.PartitionReadRequest; @@ -171,9 +174,10 @@ public Paginated listInstanceConfigs(int pageSize, @Nullable Str } ListInstanceConfigsRequest request = requestBuilder.build(); - // TODO: put projectName in metadata + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); ListInstanceConfigsResponse response = - get(instanceStub.listInstanceConfigsCallable().futureCall(request)); + get(instanceStub.listInstanceConfigsCallable().futureCall(request, context)); return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken()); } @@ -182,8 +186,9 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne GetInstanceConfigRequest request = GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build(); - // TODO: put projectName in metadata - return get(instanceStub.getInstanceConfigCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); + return get(instanceStub.getInstanceConfigCallable().futureCall(request, context)); } @Override @@ -199,9 +204,10 @@ public Paginated listInstances( } ListInstancesRequest request = requestBuilder.build(); - // TODO: put projectName in metadata + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); ListInstancesResponse response = - get(instanceStub.listInstancesCallable().futureCall(request)); + get(instanceStub.listInstancesCallable().futureCall(request, context)); return new Paginated<>(response.getInstancesList(), response.getNextPageToken()); } @@ -214,16 +220,20 @@ public Operation createInstance(String parent, String instanceId, Instance insta .setInstanceId(instanceId) .setInstance(instance) .build(); - // TODO: put parent in metadata - return get(instanceStub.createInstanceCallable().futureCall(request)); + + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(parent, projectName)); + return get(instanceStub.createInstanceCallable().futureCall(request, context)); } @Override public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException { UpdateInstanceRequest request = UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); - // TODO: put instance.getName() in metadata - return get(instanceStub.updateInstanceCallable().futureCall(request)); + + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(instance.getName(), projectName)); + return get(instanceStub.updateInstanceCallable().futureCall(request, context)); } @Override @@ -231,8 +241,9 @@ public Instance getInstance(String instanceName) throws SpannerException { GetInstanceRequest request = GetInstanceRequest.newBuilder().setName(instanceName).build(); - // TODO: put instanceName in metadata - return get(instanceStub.getInstanceCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + return get(instanceStub.getInstanceCallable().futureCall(request, context)); } @Override @@ -240,8 +251,9 @@ public void deleteInstance(String instanceName) throws SpannerException { DeleteInstanceRequest request = DeleteInstanceRequest.newBuilder().setName(instanceName).build(); - // TODO: put instanceName in metadata - get(instanceStub.deleteInstanceCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + get(instanceStub.deleteInstanceCallable().futureCall(request, context)); } @Override @@ -254,8 +266,10 @@ public Paginated listDatabases( } ListDatabasesRequest request = requestBuilder.build(); - // TODO: put instanceName in metadata - ListDatabasesResponse response = get(databaseStub.listDatabasesCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + ListDatabasesResponse response = get(databaseStub.listDatabasesCallable() + .futureCall(request, context)); return new Paginated<>(response.getDatabasesList(), response.getNextPageToken()); } @@ -268,8 +282,9 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem .setCreateStatement(createDatabaseStatement) .addAllExtraStatements(additionalStatements) .build(); - // TODO: put instanceName in metadata - return get(databaseStub.createDatabaseCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + return get(databaseStub.createDatabaseCallable().futureCall(request, context)); } @Override @@ -281,8 +296,9 @@ public Operation updateDatabaseDdl(String databaseName, Iterable updateD .addAllStatements(updateDatabaseStatements) .setOperationId(MoreObjects.firstNonNull(updateId, "")) .build(); - // TODO: put databaseName in metadata - return get(databaseStub.updateDatabaseDdlCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context)); } @Override @@ -290,8 +306,9 @@ public void dropDatabase(String databaseName) throws SpannerException { DropDatabaseRequest request = DropDatabaseRequest.newBuilder().setDatabase(databaseName).build(); - // TODO: put databaseName in metadata - get(databaseStub.dropDatabaseCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + get(databaseStub.dropDatabaseCallable().futureCall(request, context)); } @Override @@ -301,8 +318,9 @@ public Database getDatabase(String databaseName) throws SpannerException { .setName(databaseName) .build(); - // TODO: put databaseName in metadata - return get(databaseStub.getDatabaseCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + return get(databaseStub.getDatabaseCallable().futureCall(request, context)); } @Override @@ -310,34 +328,55 @@ public List getDatabaseDdl(String databaseName) throws SpannerException GetDatabaseDdlRequest request = GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build(); - // TODO: put databaseName in metadata - return get(databaseStub.getDatabaseDdlCallable().futureCall(request)) + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context)) .getStatementsList(); } @Override public Operation getOperation(String name) throws SpannerException { GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); - // TODO: put name in metadata - return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withExtraHeaders(metadataProvider.newExtraHeaders(name, projectName)); + return get(databaseStub.getOperationsStub().getOperationCallable() + .futureCall(request, context)); } @Override public Session createSession(String databaseName, @Nullable Map labels, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + CreateSessionRequest.Builder requestBuilder = + CreateSessionRequest.newBuilder().setDatabase(databaseName); + if (labels != null && !labels.isEmpty()) { + Session.Builder session = Session.newBuilder().putAllLabels(labels); + requestBuilder.setSession(session); + } + CreateSessionRequest request = requestBuilder.build(); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + return get(stub.createSessionCallable().futureCall(request, context)); } @Override public void deleteSession(String sessionName, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + DeleteSessionRequest request = + DeleteSessionRequest.newBuilder().setName(sessionName).build(); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(sessionName, projectName)); + get(stub.deleteSessionCallable().futureCall(request, context)); } @Override public StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + throw new UnsupportedOperationException("not implemented yet"); } @Override @@ -349,33 +388,47 @@ public StreamingCall executeQuery( @Override public Transaction beginTransaction( BeginTransactionRequest request, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + return get(stub.beginTransactionCallable().futureCall(request, context)); } @Override public CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders( + metadataProvider.newExtraHeaders(commitRequest.getSession(), projectName)); + return get(stub.commitCallable().futureCall(commitRequest, context)); } @Override public void rollback(RollbackRequest request, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + get(stub.rollbackCallable().futureCall(request, context)); } @Override public PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException { - throw new UnsupportedOperationException("Not implemented yet."); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + return get(stub.partitionQueryCallable().futureCall(request, context)); } @Override public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { - // TODO(pongad): Figure out metadata - // TODO(pongad): Figure out channel affinity - return get(stub.partitionReadCallable().futureCall(request)); + GrpcCallContext context = GrpcCallContext.createDefault() + .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) + .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + return get(stub.partitionReadCallable().futureCall(request, context)); } /** Gets the result of an async RPC call, handling any exceptions encountered. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index f80eb90b12c1..830b5a89abd8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -18,6 +18,8 @@ import com.google.common.collect.ImmutableMap; import io.grpc.Metadata; import io.grpc.Metadata.Key; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -27,7 +29,7 @@ */ class SpannerMetadataProvider { private final Map, String> headers; - private final Key resourceHeaderKey; + private final String resourceHeaderKey; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), @@ -35,7 +37,7 @@ class SpannerMetadataProvider { }; private SpannerMetadataProvider(Map headers, String resourceHeaderKey) { - this.resourceHeaderKey = Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER); + this.resourceHeaderKey = resourceHeaderKey; this.headers = constructHeadersAsMetadata(headers); } @@ -50,11 +52,21 @@ Metadata newMetadata(String resourceTokenTemplate, String defaultResourceToken) } metadata.put( - resourceHeaderKey, getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)); + Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER), + getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)); return metadata; } + Map> newExtraHeaders(String resourceTokenTemplate, String defaultResourceToken) { + return ImmutableMap.>builder() + .put( + resourceHeaderKey, + Arrays.asList(getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken))) + .build(); + + } + private Map, String> constructHeadersAsMetadata( Map headers) { ImmutableMap.Builder, String> headersAsMetadataBuilder = diff --git a/pom.xml b/pom.xml index 232a4d6af40b..2782199094b1 100644 --- a/pom.xml +++ b/pom.xml @@ -138,7 +138,7 @@ google-cloud 0.43.1-alpha-SNAPSHOT 1.23.0 - 1.23.0 + 1.24.0 0.9.0 1.10.1 2.0.7.Final From aecf9567531cf0b8b6fbc5c6833d7b4fd92a08b5 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 12 Apr 2018 13:45:19 -0700 Subject: [PATCH 2/5] Add helper method to create context --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 85 +++++++------------ 1 file changed, 32 insertions(+), 53 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index fe484a3f95c1..7bc8d4c5e7e7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -174,8 +174,7 @@ public Paginated listInstanceConfigs(int pageSize, @Nullable Str } ListInstanceConfigsRequest request = requestBuilder.build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); + GrpcCallContext context = newCallContext(null, projectName); ListInstanceConfigsResponse response = get(instanceStub.listInstanceConfigsCallable().futureCall(request, context)); return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken()); @@ -186,8 +185,7 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne GetInstanceConfigRequest request = GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); + GrpcCallContext context = newCallContext(null, projectName); return get(instanceStub.getInstanceConfigCallable().futureCall(request, context)); } @@ -204,8 +202,7 @@ public Paginated listInstances( } ListInstancesRequest request = requestBuilder.build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName)); + GrpcCallContext context = newCallContext(null, projectName); ListInstancesResponse response = get(instanceStub.listInstancesCallable().futureCall(request, context)); return new Paginated<>(response.getInstancesList(), response.getNextPageToken()); @@ -221,8 +218,7 @@ public Operation createInstance(String parent, String instanceId, Instance insta .setInstance(instance) .build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(parent, projectName)); + GrpcCallContext context = newCallContext(null, parent); return get(instanceStub.createInstanceCallable().futureCall(request, context)); } @@ -231,8 +227,7 @@ public Operation updateInstance(Instance instance, FieldMask fieldMask) throws S UpdateInstanceRequest request = UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(instance.getName(), projectName)); + GrpcCallContext context = newCallContext(null, instance.getName()); return get(instanceStub.updateInstanceCallable().futureCall(request, context)); } @@ -241,8 +236,7 @@ public Instance getInstance(String instanceName) throws SpannerException { GetInstanceRequest request = GetInstanceRequest.newBuilder().setName(instanceName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + GrpcCallContext context = newCallContext(null, instanceName); return get(instanceStub.getInstanceCallable().futureCall(request, context)); } @@ -251,8 +245,7 @@ public void deleteInstance(String instanceName) throws SpannerException { DeleteInstanceRequest request = DeleteInstanceRequest.newBuilder().setName(instanceName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + GrpcCallContext context = newCallContext(null, instanceName); get(instanceStub.deleteInstanceCallable().futureCall(request, context)); } @@ -266,8 +259,7 @@ public Paginated listDatabases( } ListDatabasesRequest request = requestBuilder.build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + GrpcCallContext context = newCallContext(null, instanceName); ListDatabasesResponse response = get(databaseStub.listDatabasesCallable() .futureCall(request, context)); return new Paginated<>(response.getDatabasesList(), response.getNextPageToken()); @@ -282,8 +274,7 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem .setCreateStatement(createDatabaseStatement) .addAllExtraStatements(additionalStatements) .build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName)); + GrpcCallContext context = newCallContext(null, instanceName); return get(databaseStub.createDatabaseCallable().futureCall(request, context)); } @@ -296,8 +287,7 @@ public Operation updateDatabaseDdl(String databaseName, Iterable updateD .addAllStatements(updateDatabaseStatements) .setOperationId(MoreObjects.firstNonNull(updateId, "")) .build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + GrpcCallContext context = newCallContext(null, databaseName); return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context)); } @@ -306,8 +296,7 @@ public void dropDatabase(String databaseName) throws SpannerException { DropDatabaseRequest request = DropDatabaseRequest.newBuilder().setDatabase(databaseName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + GrpcCallContext context = newCallContext(null, databaseName); get(databaseStub.dropDatabaseCallable().futureCall(request, context)); } @@ -318,8 +307,7 @@ public Database getDatabase(String databaseName) throws SpannerException { .setName(databaseName) .build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + GrpcCallContext context = newCallContext(null, databaseName); return get(databaseStub.getDatabaseCallable().futureCall(request, context)); } @@ -328,8 +316,7 @@ public List getDatabaseDdl(String databaseName) throws SpannerException GetDatabaseDdlRequest request = GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + GrpcCallContext context = newCallContext(null, databaseName); return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context)) .getStatementsList(); } @@ -337,8 +324,7 @@ public List getDatabaseDdl(String databaseName) throws SpannerException @Override public Operation getOperation(String name) throws SpannerException { GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withExtraHeaders(metadataProvider.newExtraHeaders(name, projectName)); + GrpcCallContext context = newCallContext(null, name); return get(databaseStub.getOperationsStub().getOperationCallable() .futureCall(request, context)); } @@ -353,9 +339,7 @@ public Session createSession(String databaseName, @Nullable Map requestBuilder.setSession(session); } CreateSessionRequest request = requestBuilder.build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName)); + GrpcCallContext context = newCallContext(options, databaseName); return get(stub.createSessionCallable().futureCall(request, context)); } @@ -364,18 +348,14 @@ public void deleteSession(String sessionName, @Nullable Map options) throws SpannerException { DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(sessionName, projectName)); + GrpcCallContext context = newCallContext(options, sessionName); get(stub.deleteSessionCallable().futureCall(request, context)); } @Override public StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, request.getSession()); throw new UnsupportedOperationException("not implemented yet"); } @@ -388,46 +368,35 @@ public StreamingCall executeQuery( @Override public Transaction beginTransaction( BeginTransactionRequest request, @Nullable Map options) throws SpannerException { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, request.getSession()); return get(stub.beginTransactionCallable().futureCall(request, context)); } @Override public CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) throws SpannerException { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders( - metadataProvider.newExtraHeaders(commitRequest.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, commitRequest.getSession()); return get(stub.commitCallable().futureCall(commitRequest, context)); } @Override public void rollback(RollbackRequest request, @Nullable Map options) throws SpannerException { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, request.getSession()); get(stub.rollbackCallable().futureCall(request, context)); } @Override public PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, request.getSession()); return get(stub.partitionQueryCallable().futureCall(request, context)); } @Override public PartitionResponse partitionRead( PartitionReadRequest request, @Nullable Map options) throws SpannerException { - GrpcCallContext context = GrpcCallContext.createDefault() - .withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()) - .withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName)); + GrpcCallContext context = newCallContext(options, request.getSession()); return get(stub.partitionReadCallable().futureCall(request, context)); } @@ -444,4 +413,14 @@ private static T get(final Future future) throws SpannerException { throw newSpannerException(context, e); } } + + private GrpcCallContext newCallContext(@Nullable Map options, String resource) { + GrpcCallContext context = GrpcCallContext.createDefault(); + if (options != null) { + context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + } + context = context.withExtraHeaders( + metadataProvider.newExtraHeaders(resource, projectName)); + return context; + } } From 2a78337fbb257ea5647aaa6c30c6829f0ae2d705 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 12 Apr 2018 17:42:40 -0700 Subject: [PATCH 3/5] Suppress retry in gapic --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7bc8d4c5e7e7..7fdd2f4b2e62 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import com.google.api.core.ApiFunction; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; @@ -26,7 +27,9 @@ import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GrpcTransportOptions; @@ -44,6 +47,7 @@ import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableSet; import com.google.longrunning.GetOperationRequest; import com.google.longrunning.Operation; import com.google.protobuf.FieldMask; @@ -143,25 +147,53 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { .build()); CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); - - this.stub = + try { + this.stub = GrpcSpannerStub.create( SpannerStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) .build()); + this.instanceStub = GrpcInstanceAdminStub.create( InstanceAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) .build()); this.databaseStub = GrpcDatabaseAdminStub.create( DatabaseAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) .build()); + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } } @Override From 184584b2812a053bb0cea5c4d51c431a37c67740 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 12 Apr 2018 22:32:55 -0700 Subject: [PATCH 4/5] Add comment --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7fdd2f4b2e62..a7cf44f3323b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -148,6 +148,8 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); try { + // TODO: bump the version of gax and remove this try-catch block + // applyToAllUnaryMethods does not throw exception in the latest version this.stub = GrpcSpannerStub.create( SpannerStubSettings.newBuilder() From 1c1593471f8032c43d4422669b3d1698dc4fe0b9 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Fri, 13 Apr 2018 11:20:26 -0700 Subject: [PATCH 5/5] Add notes about disabling retry in GapicSpannerRpc --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index a7cf44f3323b..04c05b64e52a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -147,6 +147,10 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { .build()); CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); + + // Disabling retry for now because spanner handles retry in SpannerImpl. + // We will finally want to improve gax but for smooth transitioning we + // preserve the retry in SpannerImpl try { // TODO: bump the version of gax and remove this try-catch block // applyToAllUnaryMethods does not throw exception in the latest version