diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index a8990112c769..fbab39630f5c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -16,15 +16,15 @@ package com.google.cloud.spanner; -import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.ServiceDefaults; import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceRpc; import com.google.cloud.TransportOptions; +import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.cloud.spanner.spi.SpannerRpcFactory; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc; -import com.google.cloud.spanner.spi.SpannerRpcFactory; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -218,6 +218,10 @@ public List getRpcChannels() { return rpcChannels; } + public int getNumChannels() { + return numChannels; + } + public SessionPoolOptions getSessionPoolOptions() { return sessionPoolOptions; } @@ -353,4 +357,15 @@ protected SpannerRpc getGapicSpannerRpc() { public Builder toBuilder() { return new Builder(this); } + + public String getEndpoint() { + URL url; + try { + url = new URL(getHost()); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid host: " + getHost(), e); + } + return String.format( + "%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort()); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 366189e1039a..18ca176d0153 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -24,6 +24,7 @@ import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; @@ -106,7 +107,8 @@ public class GapicSpannerRpc implements SpannerRpc { private static final PathTemplate PROJECT_NAME_TEMPLATE = PathTemplate.create("projects/{project}"); - + private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; + private final SpannerStub stub; private final InstanceAdminStub instanceStub; private final DatabaseAdminStub databaseStub; @@ -143,17 +145,19 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { mergedHeaderProvider.getHeaders(), internalHeaderProviderBuilder.getResourceHeaderKey()); - // TODO(pongad): make channel pool work - // TODO(pongad): make RPC logging work (formerly LoggingInterceptor) // TODO(pongad): add watchdog // TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor) + // TODO(hzyi): make this channelProvider configurable through SpannerOptions TransportChannelProvider channelProvider = - FixedTransportChannelProvider.create( - GrpcTransportChannel.newBuilder() - .setManagedChannel(options.getRpcChannels().get(0)) - .build()); + InstantiatingGrpcChannelProvider + .newBuilder() + .setEndpoint(options.getEndpoint()) + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setPoolSize(options.getNumChannels()) + .build(); + CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options);