diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index b505681b195b..487ee0ccb4cb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -93,9 +94,6 @@ public class Subscriber extends AbstractApiService { private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60); private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5); - private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR = - InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor(); - private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); private final String subscriptionName; @@ -132,6 +130,8 @@ private Subscriber(Builder builder) { .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) .build()); + this.numPullers = builder.parallelPullCount; + executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { closeables.add( @@ -142,8 +142,16 @@ public void close() throws IOException { } }); } - alarmsExecutor = builder.systemExecutorProvider.getExecutor(); - if (builder.systemExecutorProvider.shouldAutoClose()) { + + ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider; + if (systemExecutorProvider == null) { + systemExecutorProvider = + FixedExecutorProvider.create( + Executors.newScheduledThreadPool(Math.max(6, 2 * numPullers))); + } + + alarmsExecutor = systemExecutorProvider.getExecutor(); + if (systemExecutorProvider.shouldAutoClose()) { closeables.add( new AutoCloseable() { @Override @@ -153,7 +161,6 @@ public void close() throws IOException { }); } - this.numPullers = builder.parallelPullCount; TransportChannelProvider channelProvider = builder.channelProvider; if (channelProvider.acceptsPoolSize()) { channelProvider = channelProvider.withPoolSize(numPullers); @@ -162,7 +169,7 @@ public void close() throws IOException { try { this.subStubSettings = SubscriberStubSettings.newBuilder() - .setExecutorProvider(FixedExecutorProvider.create(alarmsExecutor)) + .setExecutorProvider(systemExecutorProvider) .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(channelProvider) .setHeaderProvider(builder.headerProvider) @@ -404,7 +411,7 @@ public static final class Builder { FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; - ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR); + ExecutorProvider systemExecutorProvider = null; TransportChannelProvider channelProvider = SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)