Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make stream wait timeout a first class citizen #1473

Merged
merged 9 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() {
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
retrySettings
.toBuilder()
.setInitialRpcTimeout(newTimeout)
.setMaxRpcTimeout(newTimeout)
mutianf marked this conversation as resolved.
Show resolved Hide resolved
.setMaxAttempts(3)
.build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
callable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withStreamIdleTimeout(callSettings.getIdleTimeout()));
.withStreamIdleTimeout(callSettings.getIdleTimeout())
.withStreamWaitTimeout(callSettings.getWaitTimeout()));
mutianf marked this conversation as resolved.
Show resolved Hide resolved
mutianf marked this conversation as resolved.
Show resolved Hide resolved

return callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
* A callable that generates Server Streaming attempts. At any one time, it is responsible for at
Expand Down Expand Up @@ -181,15 +180,6 @@ public void cancel() {
}
isStarted = true;

// Propagate the totalTimeout as the overall stream deadline, so long as the user
// has not provided a timeout via the ApiCallContext. If they have, retain it.
Duration totalTimeout =
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();

if (totalTimeout != null && context != null && context.getTimeout() == null) {
context = context.withTimeout(totalTimeout);
}

// Call the inner callable
call();
}
Expand Down Expand Up @@ -218,13 +208,10 @@ public Void call() {

ApiCallContext attemptContext = context;

// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
// does not already have a timeout set by a user via withStreamWaitTimeout.
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
&& attemptContext.getStreamWaitTimeout() == null) {
&& attemptContext.getTimeout() == null) {
attemptContext =
attemptContext.withStreamWaitTimeout(
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
}

attemptContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
* <p>This class includes settings that are applicable to all server streaming calls, which
* currently just includes retries and watchdog timers.
*
* <p>The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any
* stream that has not has seen any demand (via {@link StreamController#request(int)}) in the
* configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}.
* <p>The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog
* will terminate any stream that has not has seen any demand (via {@link
* StreamController#request(int)}) in the configured interval or has not seen a message from the
* server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link
* Duration#ZERO}.
*
* <p>Retry configuration allows for the stream to be restarted and resumed. It is composed of 3
* parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable
Expand Down Expand Up @@ -79,12 +81,14 @@ public final class ServerStreamingCallSettings<RequestT, ResponseT>
@Nonnull private final StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategy;

@Nonnull private final Duration idleTimeout;
@Nonnull private final Duration waitTimeout;
mutianf marked this conversation as resolved.
Show resolved Hide resolved

private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {
this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes);
this.retrySettings = builder.retrySettingsBuilder.build();
this.resumptionStrategy = builder.resumptionStrategy;
this.idleTimeout = builder.idleTimeout;
this.waitTimeout = builder.waitTimeout;
}

/**
Expand Down Expand Up @@ -123,6 +127,15 @@ public Duration getIdleTimeout() {
return idleTimeout;
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #waitTimeout} does.
*/
@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

public Builder<RequestT, ResponseT> toBuilder() {
return new Builder<>(this);
}
Expand All @@ -135,6 +148,7 @@ public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("idleTimeout", idleTimeout)
.add("waitTimeout", waitTimeout)
.add("retryableCodes", retryableCodes)
.add("retrySettings", retrySettings)
.toString();
Expand All @@ -148,13 +162,16 @@ public static class Builder<RequestT, ResponseT>

@Nonnull private Duration idleTimeout;

@Nonnull private Duration waitTimeout;

/** Initialize the builder with default settings */
private Builder() {
this.retryableCodes = ImmutableSet.of();
this.retrySettingsBuilder = RetrySettings.newBuilder();
this.resumptionStrategy = new SimpleStreamResumptionStrategy<>();

this.idleTimeout = Duration.ZERO;
this.waitTimeout = Duration.ZERO;
}

private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
Expand All @@ -164,6 +181,7 @@ private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
this.resumptionStrategy = settings.resumptionStrategy;

this.idleTimeout = settings.idleTimeout;
this.waitTimeout = settings.waitTimeout;
}

/**
Expand Down Expand Up @@ -233,9 +251,9 @@ public Builder<RequestT, ResponseT> setSimpleTimeoutNoRetries(@Nonnull Duration
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(Duration.ZERO)
.setInitialRpcTimeout(Duration.ZERO)
.setInitialRpcTimeout(timeout)
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ZERO)
.setMaxRpcTimeout(timeout)
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved
.setMaxAttempts(1)
.build());

Expand Down Expand Up @@ -264,14 +282,27 @@ public Duration getIdleTimeout() {
}

/**
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
* the {@link #idleTimeout} does. {@link Duration#ZERO} disables the watchdog.
* Set how long to wait before considering the stream orphaned by the user and closing it.
* {@link Duration#ZERO} disables the check for abandoned streams.
*/
public Builder<RequestT, ResponseT> setIdleTimeout(@Nonnull Duration idleTimeout) {
this.idleTimeout = Preconditions.checkNotNull(idleTimeout);
return this;
}

@Nonnull
public Duration getWaitTimeout() {
return waitTimeout;
}

/**
* Set the maximum amount of time to wait for the next message from the server. {@link
* Duration#ZERO} disables the check for abandoned streams.
*/
public void setWaitTimeout(@Nonnull Duration waitTimeout) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
this.waitTimeout = waitTimeout;
}

@Override
public ServerStreamingCallSettings<RequestT, ResponseT> build() {
return new ServerStreamingCallSettings<>(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce
ServerStreamingCallable<Object, Object> callable =
Callables.retrying(innerServerStreamingCallable, callSettings, clientContext);

Duration timeout = retrySettings.getTotalTimeout();
Duration timeout = retrySettings.getInitialRpcTimeout();

callable.call("Is your refrigerator running?", callContextWithRetrySettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ServerStreamingAttemptCallableTest {
private FakeRetryingFuture fakeRetryingFuture;
private StreamResumptionStrategy<String, String> resumptionStrategy;
private static Duration totalTimeout = Duration.ofHours(1);
private static final Duration attemptTimeout = Duration.ofMinutes(1);
private FakeCallContext mockedCallContext;

@Before
Expand Down Expand Up @@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() {
// Ensure that the callable did not overwrite the user provided timeouts
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.never())
.withStreamWaitTimeout(Mockito.any(Duration.class));

Expand Down Expand Up @@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() {
Mockito.doReturn(BaseApiTracer.getInstance()).when(mockedCallContext).getTracer();
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout);
Mockito.doReturn(mockedCallContext)
.when(mockedCallContext)
.withStreamWaitTimeout(Mockito.any(Duration.class));
Expand All @@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() {
// Ensure that the callable configured the timeouts via the Settings in the
// absence of user-defined timeouts.
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1))
.withStreamWaitTimeout(Mockito.any(Duration.class));
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout);

// Should notify outer observer
Truth.assertThat(observer.controller).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public void idleTimeoutIsNotLost() {
assertThat(builder.build().toBuilder().getIdleTimeout()).isEqualTo(idleTimeout);
}

@Test
public void waitTimeoutIsNotLost() {
Duration waitTimeout = Duration.ofSeconds(5);

ServerStreamingCallSettings.Builder<Object, Object> builder =
ServerStreamingCallSettings.newBuilder();

builder.setWaitTimeout(waitTimeout);

assertThat(builder.getWaitTimeout()).isEqualTo(waitTimeout);
assertThat(builder.build().getWaitTimeout()).isEqualTo(waitTimeout);
assertThat(builder.build().toBuilder().getWaitTimeout()).isEqualTo(waitTimeout);
}

@Test
public void testRetrySettingsBuilder() {
RetrySettings initialSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.rpc.CancelledException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Status;
import com.google.showcase.v1beta1.EchoClient;
import com.google.showcase.v1beta1.EchoResponse;
import com.google.showcase.v1beta1.EchoSettings;
import com.google.showcase.v1beta1.ExpandRequest;
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.threeten.bp.Duration;

public class ITServerSideStreaming {

Expand Down Expand Up @@ -104,6 +110,49 @@ public void testGrpc_serverError_receiveErrorAfterLastWordInStream() {
assertThat(cancelledException.getStatusCode().getCode()).isEqualTo(StatusCode.Code.CANCELLED);
}

@Test
public void testGrpc_serverWaitTimeout_watchdogCancelsStream() throws Exception {
EchoSettings.Builder settings =
EchoSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(
EchoSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.build());

settings
.expandSettings()
.setIdleTimeout(Duration.ofMillis(100))
.setWaitTimeout(Duration.ofMillis(100));

settings.getStubSettingsBuilder().setStreamWatchdogCheckInterval(Duration.ofMillis(50));

EchoClient echoClient = EchoClient.create(settings.build());

String content = "The rain in Spain stays mainly on the plain!";
ServerStream<EchoResponse> responseStream =
echoClient
.expandCallable()
.call(
ExpandRequest.newBuilder()
.setContent(content)
// Configure server interval for returning the next response
.setStreamWaitTime(
com.google.protobuf.Duration.newBuilder().setSeconds(1).build())
.build());
ArrayList<String> responses = new ArrayList<>();
try {
for (EchoResponse response : responseStream) {
responses.add(response.getContent());
}
Assert.fail("No exception was thrown");
} catch (WatchdogTimeoutException e) {
assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response");
} finally {
echoClient.close();
}
}

@Test
public void testHttpJson_receiveStreamedContent() {
String content = "The rain in Spain stays mainly on the plain!";
Expand Down