Skip to content

Commit

Permalink
Make integration tests work
Browse files Browse the repository at this point in the history
  • Loading branch information
yihanzhen committed Apr 13, 2018
1 parent f208590 commit 067a190
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Struct;
Expand Down Expand Up @@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand All @@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
SpannerImpl(SpannerOptions options) {
this(
options.getSpannerRpcV1(),
GapicSpannerRpc.create(options),
options.getGapicSpannerRpc(),
options.getPrefetchChunks(),
options);
}
Expand Down Expand Up @@ -1063,7 +1063,9 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
null,
session.options));

// let resume fail for now
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
return new GrpcResultSet(stream, this, queryMode);
Expand Down Expand Up @@ -1170,7 +1172,9 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
null,
session.options));

// let resume fail for now
// Let resume fail for now. Gapic has its own resume, but in order not
// to introduce too much change at a time, we decide to plumb up
// ServerStream first and then figure out how to make resume work
}
};
GrpcResultSet resultSet =
Expand Down Expand Up @@ -2288,17 +2292,32 @@ public CloseableServerStreamIterator(ServerStream<T> stream) {

@Override
public boolean hasNext() {
return iterator.hasNext();
try {
return iterator.hasNext();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public T next() {
return iterator.next();
try {
return iterator.next();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}

@Override
public void close(@Nullable String message) {
stream.cancel();
try {
stream.cancel();
}
catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.SpannerRpcFactory;
Expand Down Expand Up @@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

protected SpannerRpc getGapicSpannerRpc() {
return GapicSpannerRpc.create(this);
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,7 @@ public ServerStream<PartialResultSet> read(
@Override
public ServerStream<PartialResultSet> executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
GrpcCallContext context = newCallContext(options, request.getSession());
return stub.executeStreamingSqlCallable().call(request, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final class BatchClientImplTest {
private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn");
private static final String TIMESTAMP = "2017-11-15T10:54:20Z";

@Mock private SpannerRpc rpc;
@Mock private SpannerRpc rawGrpcRpc;
@Mock private SpannerRpc gapicRpc;
@Mock private SpannerOptions spannerOptions;
@Captor private ArgumentCaptor<Map<SpannerRpc.Option, Object>> optionsCaptor;
@Mock private BatchTransactionId txnID;
Expand All @@ -59,20 +60,20 @@ public final class BatchClientImplTest {
public void setUp() {
initMocks(this);
DatabaseId db = DatabaseId.of(DB_NAME);
SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions);
SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions);
client = new BatchClientImpl(db, spanner);
}

@Test
public void testBatchReadOnlyTxnWithBound() throws Exception {
Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build();
when(rpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
when(gapicRpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
.thenReturn(sessionProto);
com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP);
Transaction txnMetadata =
Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build();
when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc);
when(rpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc);
when(gapicRpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
.thenReturn(txnMetadata);

BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong());
Expand Down

0 comments on commit 067a190

Please sign in to comment.