Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: migrate all unary call methods to gapic and inject headers #3112

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions google-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@
<testing.version>0.43.1-alpha-SNAPSHOT</testing.version><!-- {x-version-update:google-cloud-testing:current} -->

<api-common.version>1.5.0</api-common.version>
<gax.version>1.23.0</gax.version>
<gax-grpc.version>1.23.0</gax-grpc.version>
<gax-httpjson.version>0.40.0</gax-httpjson.version>
<gax.version>1.24.0</gax.version>
<gax-grpc.version>1.24.0</gax-grpc.version>
<gax-httpjson.version>0.41.0</gax-httpjson.version>
<generated-proto-beta.version>0.8.0</generated-proto-beta.version>
<generated-proto-ga.version>1.7.0</generated-proto-ga.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Session createSession(final DatabaseId db) throws SpannerException {
new Callable<com.google.spanner.v1.Session>() {
@Override
public com.google.spanner.v1.Session call() throws Exception {
return rawGrpcRpc.createSession(
return gapicRpc.createSession(
db.getName(), getOptions().getSessionLabels(), options);
}
});
Expand Down Expand Up @@ -806,7 +806,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
new Callable<CommitResponse>() {
@Override
public CommitResponse call() throws Exception {
return rawGrpcRpc.commit(request, options);
return gapicRpc.commit(request, options);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}
});
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
Expand Down Expand Up @@ -872,7 +872,7 @@ public void close() {
new Callable<Void>() {
@Override
public Void call() throws Exception {
rawGrpcRpc.deleteSession(name, options);
gapicRpc.deleteSession(name, options);
return null;
}
});
Expand All @@ -898,7 +898,7 @@ ByteString beginTransaction() {
new Callable<Transaction>() {
@Override
public Transaction call() throws Exception {
return rawGrpcRpc.beginTransaction(request, options);
return gapicRpc.beginTransaction(request, options);
}
});
if (txn.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
Expand Down Expand Up @@ -68,6 +69,8 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
Expand Down Expand Up @@ -171,9 +174,10 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str
}
ListInstanceConfigsRequest request = requestBuilder.build();

// TODO: put projectName in metadata
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
ListInstanceConfigsResponse response =
get(instanceStub.listInstanceConfigsCallable().futureCall(request));
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
}

Expand All @@ -182,8 +186,9 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
GetInstanceConfigRequest request =
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();

// TODO: put projectName in metadata
return get(instanceStub.getInstanceConfigCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
}

@Override
Expand All @@ -199,9 +204,10 @@ public Paginated<Instance> listInstances(
}
ListInstancesRequest request = requestBuilder.build();

// TODO: put projectName in metadata
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
ListInstancesResponse response =
get(instanceStub.listInstancesCallable().futureCall(request));
get(instanceStub.listInstancesCallable().futureCall(request, context));
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
}

Expand All @@ -214,34 +220,40 @@ public Operation createInstance(String parent, String instanceId, Instance insta
.setInstanceId(instanceId)
.setInstance(instance)
.build();
// TODO: put parent in metadata
return get(instanceStub.createInstanceCallable().futureCall(request));

GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(parent, projectName));
return get(instanceStub.createInstanceCallable().futureCall(request, context));
}

@Override
public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException {
UpdateInstanceRequest request =
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
// TODO: put instance.getName() in metadata
return get(instanceStub.updateInstanceCallable().futureCall(request));

GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instance.getName(), projectName));
return get(instanceStub.updateInstanceCallable().futureCall(request, context));
}

@Override
public Instance getInstance(String instanceName) throws SpannerException {
GetInstanceRequest request =
GetInstanceRequest.newBuilder().setName(instanceName).build();

// TODO: put instanceName in metadata
return get(instanceStub.getInstanceCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
return get(instanceStub.getInstanceCallable().futureCall(request, context));
}

@Override
public void deleteInstance(String instanceName) throws SpannerException {
DeleteInstanceRequest request =
DeleteInstanceRequest.newBuilder().setName(instanceName).build();

// TODO: put instanceName in metadata
get(instanceStub.deleteInstanceCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
}

@Override
Expand All @@ -254,8 +266,10 @@ public Paginated<Database> listDatabases(
}
ListDatabasesRequest request = requestBuilder.build();

// TODO: put instanceName in metadata
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
.futureCall(request, context));
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
}

Expand All @@ -268,8 +282,9 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem
.setCreateStatement(createDatabaseStatement)
.addAllExtraStatements(additionalStatements)
.build();
// TODO: put instanceName in metadata
return get(databaseStub.createDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
return get(databaseStub.createDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -281,17 +296,19 @@ public Operation updateDatabaseDdl(String databaseName, Iterable<String> updateD
.addAllStatements(updateDatabaseStatements)
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
.build();
// TODO: put databaseName in metadata
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context));
}

@Override
public void dropDatabase(String databaseName) throws SpannerException {
DropDatabaseRequest request =
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();

// TODO: put databaseName in metadata
get(databaseStub.dropDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -301,43 +318,65 @@ public Database getDatabase(String databaseName) throws SpannerException {
.setName(databaseName)
.build();

// TODO: put databaseName in metadata
return get(databaseStub.getDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
}

@Override
public List<String> getDatabaseDdl(String databaseName) throws SpannerException {
GetDatabaseDdlRequest request =
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();

// TODO: put databaseName in metadata
return get(databaseStub.getDatabaseDdlCallable().futureCall(request))
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
.getStatementsList();
}

@Override
public Operation getOperation(String name) throws SpannerException {
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
// TODO: put name in metadata
return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(name, projectName));
return get(databaseStub.getOperationsStub().getOperationCallable()
.futureCall(request, context));
}

@Override
public Session createSession(String databaseName, @Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
CreateSessionRequest.Builder requestBuilder =
CreateSessionRequest.newBuilder().setDatabase(databaseName);
if (labels != null && !labels.isEmpty()) {
Session.Builder session = Session.newBuilder().putAllLabels(labels);
requestBuilder.setSession(session);
}
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(stub.createSessionCallable().futureCall(request, context));
}

@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
DeleteSessionRequest request =
DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(sessionName, projectName));
get(stub.deleteSessionCallable().futureCall(request, context));
}

@Override
public StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
throw new UnsupportedOperationException("not implemented yet");
}

@Override
Expand All @@ -349,33 +388,47 @@ public StreamingCall executeQuery(
@Override
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));

This comment was marked as spam.

return get(stub.beginTransactionCallable().futureCall(request, context));
}

@Override
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(
metadataProvider.newExtraHeaders(commitRequest.getSession(), projectName));
return get(stub.commitCallable().futureCall(commitRequest, context));
}

@Override
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
get(stub.rollbackCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionQuery(
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
return get(stub.partitionQueryCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionRead(
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
// TODO(pongad): Figure out metadata
// TODO(pongad): Figure out channel affinity
return get(stub.partitionReadCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
return get(stub.partitionReadCallable().futureCall(request, context));
}

/** Gets the result of an async RPC call, handling any exceptions encountered. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -27,15 +29,15 @@
*/
class SpannerMetadataProvider {
private final Map<Metadata.Key<String>, String> headers;
private final Key<String> resourceHeaderKey;
private final String resourceHeaderKey;

private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
};

private SpannerMetadataProvider(Map<String, String> headers, String resourceHeaderKey) {
this.resourceHeaderKey = Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER);
this.resourceHeaderKey = resourceHeaderKey;
this.headers = constructHeadersAsMetadata(headers);
}

Expand All @@ -50,11 +52,21 @@ Metadata newMetadata(String resourceTokenTemplate, String defaultResourceToken)
}

metadata.put(
resourceHeaderKey, getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));
Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER),
getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));

return metadata;
}

Map<String, List<String>> newExtraHeaders(String resourceTokenTemplate, String defaultResourceToken) {
return ImmutableMap.<String, List<String>>builder()
.put(
resourceHeaderKey,
Arrays.asList(getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)))
.build();

}

private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
Map<String, String> headers) {
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<site.installationModule>google-cloud</site.installationModule>
<bom.version>0.43.1-alpha-SNAPSHOT</bom.version><!-- {x-version-update:google-cloud-pom:current} -->
<api-client.version>1.23.0</api-client.version>
<gax.version>1.23.0</gax.version>
<gax.version>1.24.0</gax.version>
<google.auth.version>0.9.0</google.auth.version>
<grpc.version>1.10.1</grpc.version>
<nettyssl.version>2.0.7.Final</nettyssl.version>
Expand Down