Skip to content

Commit

Permalink
pubsub: add Publisher.awaitTermination (#3688)
Browse files Browse the repository at this point in the history
[Newer gRPC versions](https://github.com/grpc/grpc-java/releases/tag/v1.12.0) seem to check that we call this method.
Currently shutdown waits for all messages to publish and return before shutting
anything down, so awaitTermination likely won't do anything meaningful.

In the future, we should make shutdown return promptly and use
awaitTermination to wait for messages.
I reported this at #3687.

Fixes #3648.
  • Loading branch information
pongad committed Sep 17, 2018
1 parent 739d519 commit ae614b3
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 28 deletions.
1 change: 1 addition & 0 deletions google-cloud-clients/google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ try {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -46,7 +43,6 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.Status;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -424,6 +420,16 @@ public void shutdown() throws Exception {
publisherStub.shutdown();
}

/**
* Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout
* occurs, or the current thread is interrupted.
*
* <p>Call this method to make sure all resources are freed properly.
*/
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return publisherStub.awaitTermination(duration, unit);
}

private boolean hasBatchingBytes() {
return getMaxBatchBytes() > 0;
}
Expand All @@ -443,6 +449,7 @@ private boolean hasBatchingBytes() {
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* publisher.awaitTermination(1, TimeUnit.MINUTES);
* }
* }</pre>
*/
Expand All @@ -463,6 +470,7 @@ public static Builder newBuilder(TopicName topicName) {
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* publisher.awaitTermination(1, TimeUnit.MINUTES);
* }
* }</pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsub.it;

import static com.google.common.truth.Truth.assertThat;

import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
Expand All @@ -32,20 +34,17 @@
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static com.google.common.truth.Truth.assertThat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class ITPubSubTest {

Expand Down Expand Up @@ -147,6 +146,7 @@ public void failed(Subscriber.State from, Throwable failure) {
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
.get();
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Ack the first message.
MessageAndConsumer toAck = pollQueue(receiveQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -114,6 +115,7 @@ public void testPublishByDuration() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -152,6 +154,7 @@ public void testPublishByNumBatchedMessages() throws Exception {
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -186,6 +189,7 @@ public void testSinglePublishByNumBytes() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down Expand Up @@ -228,6 +232,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
Expand Down Expand Up @@ -278,6 +283,7 @@ public void testPublishFailureRetries() throws Exception {

assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test(expected = ExecutionException.class)
Expand All @@ -302,6 +308,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
} finally {
assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand All @@ -328,6 +335,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand All @@ -353,6 +361,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception

assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test(expected = ExecutionException.class)
Expand Down Expand Up @@ -381,6 +390,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
} finally {
assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1);
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand All @@ -403,6 +413,7 @@ public void testPublisherGetters() throws Exception {
assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold());
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously
Expand Down Expand Up @@ -75,6 +75,7 @@ public static void publishMessages() throws Exception {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
// [END pubsub_publish]
Expand Down Expand Up @@ -123,11 +124,12 @@ public void onSuccess(String messageId) {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
// [END pubsub_publish_error_handler]
}

public static void main(String... args) throws Exception {
createTopic();
publishMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import org.threeten.bp.Duration;

import java.io.FileInputStream;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

/** This class contains snippets for the {@link Publisher} interface. */
public class PublisherSnippets {
Expand Down Expand Up @@ -78,6 +78,7 @@ public static void newBuilder(String projectId, String topicId) throws Exception
} finally {
// When finished with the publisher, make sure to shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down Expand Up @@ -108,8 +109,8 @@ public Publisher getPublisherWithCustomRetrySettings(ProjectTopicName topicName)
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0

RetrySettings retrySettings = RetrySettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.examples.pubsub.snippets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -24,27 +27,22 @@
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.ReceivedMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class ITPubSubSnippets {

Expand Down Expand Up @@ -98,6 +96,7 @@ public void onFailure(Throwable t) {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down Expand Up @@ -144,6 +143,7 @@ public void testPublisherSyncSubscriber() throws Exception {
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

Expand Down

0 comments on commit ae614b3

Please sign in to comment.