diff --git a/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java b/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java index 3b0d8ebc5f..9de95c1752 100644 --- a/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java +++ b/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java @@ -242,7 +242,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() { .build(); Duration newTimeout = Duration.ofSeconds(5); RetrySettings contextRetrySettings = - retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build(); + retrySettings + .toBuilder() + .setInitialRpcTimeout(newTimeout) + .setMaxRpcTimeout(newTimeout) + .setMaxAttempts(3) + .build(); GrpcCallContext retryingContext = defaultCallContext .withRetrySettings(contextRetrySettings) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Callables.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Callables.java index 413a11649c..28a76fe721 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/Callables.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/Callables.java @@ -115,7 +115,8 @@ public static ServerStreamingCallable callable.withDefaultCallContext( clientContext .getDefaultCallContext() - .withStreamIdleTimeout(callSettings.getIdleTimeout())); + .withStreamIdleTimeout(callSettings.getIdleTimeout()) + .withStreamWaitTimeout(callSettings.getWaitTimeout())); return callable; } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java index 0c71d12420..70bf4eeda0 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java @@ -37,7 +37,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import javax.annotation.concurrent.GuardedBy; -import org.threeten.bp.Duration; /** * A callable that generates Server Streaming attempts. At any one time, it is responsible for at @@ -181,15 +180,6 @@ public void cancel() { } isStarted = true; - // Propagate the totalTimeout as the overall stream deadline, so long as the user - // has not provided a timeout via the ApiCallContext. If they have, retain it. - Duration totalTimeout = - outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout(); - - if (totalTimeout != null && context != null && context.getTimeout() == null) { - context = context.withTimeout(totalTimeout); - } - // Call the inner callable call(); } @@ -218,13 +208,10 @@ public Void call() { ApiCallContext attemptContext = context; - // Set the streamWaitTimeout to the attempt RPC Timeout, only if the context - // does not already have a timeout set by a user via withStreamWaitTimeout. if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero() - && attemptContext.getStreamWaitTimeout() == null) { + && attemptContext.getTimeout() == null) { attemptContext = - attemptContext.withStreamWaitTimeout( - outerRetryingFuture.getAttemptSettings().getRpcTimeout()); + attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout()); } attemptContext diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java index eb9373e0c3..da40b657b7 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java @@ -47,9 +47,11 @@ *

This class includes settings that are applicable to all server streaming calls, which * currently just includes retries and watchdog timers. * - *

The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any - * stream that has not has seen any demand (via {@link StreamController#request(int)}) in the - * configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}. + *

The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog + * will terminate any stream that has not has seen any demand (via {@link + * StreamController#request(int)}) in the configured interval or has not seen a message from the + * server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link + * Duration#ZERO}. * *

Retry configuration allows for the stream to be restarted and resumed. It is composed of 3 * parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable @@ -79,12 +81,14 @@ public final class ServerStreamingCallSettings @Nonnull private final StreamResumptionStrategy resumptionStrategy; @Nonnull private final Duration idleTimeout; + @Nonnull private final Duration waitTimeout; private ServerStreamingCallSettings(Builder builder) { this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes); this.retrySettings = builder.retrySettingsBuilder.build(); this.resumptionStrategy = builder.resumptionStrategy; this.idleTimeout = builder.idleTimeout; + this.waitTimeout = builder.waitTimeout; } /** @@ -123,6 +127,15 @@ public Duration getIdleTimeout() { return idleTimeout; } + /** + * See the class documentation of {@link ServerStreamingCallSettings} for a description of what + * the {@link #waitTimeout} does. + */ + @Nonnull + public Duration getWaitTimeout() { + return waitTimeout; + } + public Builder toBuilder() { return new Builder<>(this); } @@ -135,6 +148,7 @@ public static Builder newBuilder() { public String toString() { return MoreObjects.toStringHelper(this) .add("idleTimeout", idleTimeout) + .add("waitTimeout", waitTimeout) .add("retryableCodes", retryableCodes) .add("retrySettings", retrySettings) .toString(); @@ -148,6 +162,8 @@ public static class Builder @Nonnull private Duration idleTimeout; + @Nonnull private Duration waitTimeout; + /** Initialize the builder with default settings */ private Builder() { this.retryableCodes = ImmutableSet.of(); @@ -155,6 +171,7 @@ private Builder() { this.resumptionStrategy = new SimpleStreamResumptionStrategy<>(); this.idleTimeout = Duration.ZERO; + this.waitTimeout = Duration.ZERO; } private Builder(ServerStreamingCallSettings settings) { @@ -164,6 +181,7 @@ private Builder(ServerStreamingCallSettings settings) { this.resumptionStrategy = settings.resumptionStrategy; this.idleTimeout = settings.idleTimeout; + this.waitTimeout = settings.waitTimeout; } /** @@ -233,9 +251,9 @@ public Builder setSimpleTimeoutNoRetries(@Nonnull Duration .setInitialRetryDelay(Duration.ZERO) .setRetryDelayMultiplier(1) .setMaxRetryDelay(Duration.ZERO) - .setInitialRpcTimeout(Duration.ZERO) + .setInitialRpcTimeout(timeout) .setRpcTimeoutMultiplier(1) - .setMaxRpcTimeout(Duration.ZERO) + .setMaxRpcTimeout(timeout) .setMaxAttempts(1) .build()); @@ -264,14 +282,27 @@ public Duration getIdleTimeout() { } /** - * See the class documentation of {@link ServerStreamingCallSettings} for a description of what - * the {@link #idleTimeout} does. {@link Duration#ZERO} disables the watchdog. + * Set how long to wait before considering the stream orphaned by the user and closing it. + * {@link Duration#ZERO} disables the check for abandoned streams. */ public Builder setIdleTimeout(@Nonnull Duration idleTimeout) { this.idleTimeout = Preconditions.checkNotNull(idleTimeout); return this; } + @Nonnull + public Duration getWaitTimeout() { + return waitTimeout; + } + + /** + * Set the maximum amount of time to wait for the next message from the server. {@link + * Duration#ZERO} disables the check for abandoned streams. + */ + public void setWaitTimeout(@Nonnull Duration waitTimeout) { + this.waitTimeout = waitTimeout; + } + @Override public ServerStreamingCallSettings build() { return new ServerStreamingCallSettings<>(this); diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java index eaa0be04c7..04e025fecb 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java @@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce ServerStreamingCallable callable = Callables.retrying(innerServerStreamingCallable, callSettings, clientContext); - Duration timeout = retrySettings.getTotalTimeout(); + Duration timeout = retrySettings.getInitialRpcTimeout(); callable.call("Is your refrigerator running?", callContextWithRetrySettings); diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java index a0c8b833f3..acc70467bf 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java @@ -62,6 +62,7 @@ public class ServerStreamingAttemptCallableTest { private FakeRetryingFuture fakeRetryingFuture; private StreamResumptionStrategy resumptionStrategy; private static Duration totalTimeout = Duration.ofHours(1); + private static final Duration attemptTimeout = Duration.ofMinutes(1); private FakeCallContext mockedCallContext; @Before @@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() { // Ensure that the callable did not overwrite the user provided timeouts Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout(); Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout); - Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout(); Mockito.verify(mockedCallContext, Mockito.never()) .withStreamWaitTimeout(Mockito.any(Duration.class)); @@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() { Mockito.doReturn(BaseApiTracer.getInstance()).when(mockedCallContext).getTracer(); Mockito.doReturn(null).when(mockedCallContext).getTimeout(); Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout(); - Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout); + Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout); Mockito.doReturn(mockedCallContext) .when(mockedCallContext) .withStreamWaitTimeout(Mockito.any(Duration.class)); @@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() { // Ensure that the callable configured the timeouts via the Settings in the // absence of user-defined timeouts. Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout(); - Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout); - Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout(); - Mockito.verify(mockedCallContext, Mockito.times(1)) - .withStreamWaitTimeout(Mockito.any(Duration.class)); + Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout); // Should notify outer observer Truth.assertThat(observer.controller).isNotNull(); diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingCallSettingsTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingCallSettingsTest.java index a14268f8d5..bcd5a9272e 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingCallSettingsTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingCallSettingsTest.java @@ -98,6 +98,20 @@ public void idleTimeoutIsNotLost() { assertThat(builder.build().toBuilder().getIdleTimeout()).isEqualTo(idleTimeout); } + @Test + public void waitTimeoutIsNotLost() { + Duration waitTimeout = Duration.ofSeconds(5); + + ServerStreamingCallSettings.Builder builder = + ServerStreamingCallSettings.newBuilder(); + + builder.setWaitTimeout(waitTimeout); + + assertThat(builder.getWaitTimeout()).isEqualTo(waitTimeout); + assertThat(builder.build().getWaitTimeout()).isEqualTo(waitTimeout); + assertThat(builder.build().toBuilder().getWaitTimeout()).isEqualTo(waitTimeout); + } + @Test public void testRetrySettingsBuilder() { RetrySettings initialSettings = diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index e8d22c2756..da78f07afa 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -19,22 +19,28 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.rpc.CancelledException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.WatchdogTimeoutException; import com.google.common.collect.ImmutableList; import com.google.rpc.Status; import com.google.showcase.v1beta1.EchoClient; import com.google.showcase.v1beta1.EchoResponse; +import com.google.showcase.v1beta1.EchoSettings; import com.google.showcase.v1beta1.ExpandRequest; import com.google.showcase.v1beta1.it.util.TestClientInitializer; +import io.grpc.ManagedChannelBuilder; import java.util.ArrayList; import java.util.Iterator; import java.util.stream.Collectors; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.threeten.bp.Duration; public class ITServerSideStreaming { @@ -104,6 +110,49 @@ public void testGrpc_serverError_receiveErrorAfterLastWordInStream() { assertThat(cancelledException.getStatusCode().getCode()).isEqualTo(StatusCode.Code.CANCELLED); } + @Test + public void testGrpc_serverWaitTimeout_watchdogCancelsStream() throws Exception { + EchoSettings.Builder settings = + EchoSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + EchoSettings.defaultGrpcTransportProviderBuilder() + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build()); + + settings + .expandSettings() + .setIdleTimeout(Duration.ofMillis(100)) + .setWaitTimeout(Duration.ofMillis(100)); + + settings.getStubSettingsBuilder().setStreamWatchdogCheckInterval(Duration.ofMillis(50)); + + EchoClient echoClient = EchoClient.create(settings.build()); + + String content = "The rain in Spain stays mainly on the plain!"; + ServerStream responseStream = + echoClient + .expandCallable() + .call( + ExpandRequest.newBuilder() + .setContent(content) + // Configure server interval for returning the next response + .setStreamWaitTime( + com.google.protobuf.Duration.newBuilder().setSeconds(1).build()) + .build()); + ArrayList responses = new ArrayList<>(); + try { + for (EchoResponse response : responseStream) { + responses.add(response.getContent()); + } + Assert.fail("No exception was thrown"); + } catch (WatchdogTimeoutException e) { + assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response"); + } finally { + echoClient.close(); + } + } + @Test public void testHttpJson_receiveStreamedContent() { String content = "The rain in Spain stays mainly on the plain!";