diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 489d27cdf126..d395b23313f9 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -111,7 +111,7 @@ com.google.api gax - 0.0.25 + 0.0.27 io.grpc diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java deleted file mode 100644 index 6478289a44bd..000000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub; - -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingBytesReachedException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingMessagesReachedException; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; -import javax.annotation.Nullable; - -/** Provides flow control capability for Pub/Sub client classes. */ -class FlowController { - @Nullable private final Semaphore outstandingMessageCount; - @Nullable private final Semaphore outstandingByteCount; - private final boolean failOnLimits; - private final Optional maxOutstandingMessages; - private final Optional maxOutstandingBytes; - - FlowController( - Optional maxOutstandingMessages, - Optional maxOutstandingBytes, - boolean failOnFlowControlLimits) { - this.maxOutstandingMessages = Preconditions.checkNotNull(maxOutstandingMessages); - this.maxOutstandingBytes = Preconditions.checkNotNull(maxOutstandingBytes); - outstandingMessageCount = - maxOutstandingMessages.isPresent() ? new Semaphore(maxOutstandingMessages.get()) : null; - outstandingByteCount = - maxOutstandingBytes.isPresent() ? new Semaphore(maxOutstandingBytes.get()) : null; - this.failOnLimits = failOnFlowControlLimits; - } - - void reserve(int messages, int bytes) throws CloudPubsubFlowControlException { - Preconditions.checkArgument(messages > 0); - - if (outstandingMessageCount != null) { - if (!failOnLimits) { - outstandingMessageCount.acquireUninterruptibly(messages); - } else if (!outstandingMessageCount.tryAcquire(messages)) { - throw new MaxOutstandingMessagesReachedException(maxOutstandingMessages.get()); - } - } - - // Will always allow to send a message even if it is larger than the flow control limit, - // if it doesn't then it will deadlock the thread. - if (outstandingByteCount != null) { - int permitsToDraw = Math.min(bytes, maxOutstandingBytes.get()); - if (!failOnLimits) { - outstandingByteCount.acquireUninterruptibly(permitsToDraw); - } else if (!outstandingByteCount.tryAcquire(permitsToDraw)) { - throw new MaxOutstandingBytesReachedException(maxOutstandingBytes.get()); - } - } - } - - void release(int messages, int bytes) { - Preconditions.checkArgument(messages > 0); - - if (outstandingMessageCount != null) { - outstandingMessageCount.release(messages); - } - if (outstandingByteCount != null) { - // Need to return at most as much bytes as it can be drawn. - int permitsToReturn = Math.min(bytes, maxOutstandingBytes.get()); - outstandingByteCount.release(permitsToReturn); - } - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java index ea0ddd46926a..32e00bbb7322 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java @@ -16,20 +16,17 @@ package com.google.cloud.pubsub; -import com.google.auth.Credentials; +import com.google.api.gax.bundling.FlowController; import com.google.cloud.Clock; -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; import com.google.cloud.pubsub.Subscriber.MessageReceiver; import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import io.grpc.Status; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -287,7 +284,7 @@ public void run() { } try { flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (CloudPubsubFlowControlException unexpectedException) { + } catch (FlowController.FlowControlException unexpectedException) { throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java index 9fa0a9eefa41..e5328cce07ae 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.StatusUtil.isRetryable; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor; @@ -39,7 +40,6 @@ import io.grpc.Channel; import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index efc63b14faa9..9013b94bef43 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; @@ -86,8 +87,8 @@ public interface Publisher { int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) // Meaningful defaults. - int DEFAULT_MAX_BUNDLE_MESSAGES = 100; - int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB + long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; + long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds @@ -129,21 +130,22 @@ public interface Publisher { * Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced. * See {@link #failOnFlowControlLimits()}. */ - Optional getMaxOutstandingMessages(); + Optional getMaxOutstandingElementCount(); /** * Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See * {@link #failOnFlowControlLimits()}. */ - Optional getMaxOutstandingBytes(); + Optional getMaxOutstandingRequestBytes(); /** * Whether to block publish calls when reaching flow control limits (see {@link - * #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}). + * #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}). * *

If set to false, a publish call will fail with either {@link - * MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as - * appropriate, when flow control limits are reached. + * RequestByteMaxOutstandingReachedException} or {@link + * ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are + * reached. */ boolean failOnFlowControlLimits(); @@ -164,24 +166,24 @@ final class Builder { String topic; // Bundling options - BundlingSettings bundlingSettings; + BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS; // Client-side flow control options - Optional maxOutstandingMessages; - Optional maxOutstandingBytes; - boolean failOnFlowControlLimits; + FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; + boolean failOnFlowControlLimits = false; // Send bundle deadline - Duration sendBundleDeadline; + Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; // RPC options - Duration requestTimeout; + Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; // Channels and credentials - Optional userCredentials; - Optional>> channelBuilder; + Optional userCredentials = Optional.absent(); + Optional>> channelBuilder = + Optional.absent(); - Optional executor; + Optional executor = Optional.absent(); /** Constructs a new {@link Builder} using the given topic. */ public static Builder newBuilder(String topic) { @@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) { Builder(String topic) { this.topic = Preconditions.checkNotNull(topic); - setDefaults(); - } - - private void setDefaults() { - userCredentials = Optional.absent(); - channelBuilder = Optional.absent(); - maxOutstandingMessages = Optional.absent(); - maxOutstandingBytes = Optional.absent(); - bundlingSettings = DEFAULT_BUNDLING_SETTINGS; - requestTimeout = DEFAULT_REQUEST_TIMEOUT; - sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; - failOnFlowControlLimits = false; - executor = Optional.absent(); } /** @@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) { // Flow control options - /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ - public Builder setMaxOutstandingMessages(int messages) { - Preconditions.checkArgument(messages > 0); - maxOutstandingMessages = Optional.of(messages); - return this; - } - - /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ - public Builder setMaxOutstandingBytes(int bytes) { - Preconditions.checkArgument(bytes > 0); - maxOutstandingBytes = Optional.of(bytes); + /** Sets the flow control settings. */ + public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { + this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } /** * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as - * appropriate. + * RequestByteMaxOutstandingReachedException} or {@link + * ElementCountMaxOutstandingReachedException} as appropriate. * *

If set to false, then publish operations will block the current thread until the * outstanding requests go under the limits. @@ -306,51 +287,4 @@ public Publisher build() throws IOException { return new PublisherImpl(this); } } - - /** Base exception that signals a flow control state. */ - abstract class CloudPubsubFlowControlException extends Exception {} - - /** - * Returned as a future exception when client-side flow control is enforced based on the maximum - * number of outstanding in-memory messages. - */ - final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException { - private final int currentMaxMessages; - - public MaxOutstandingMessagesReachedException(int currentMaxMessages) { - this.currentMaxMessages = currentMaxMessages; - } - - public int getCurrentMaxBundleMessages() { - return currentMaxMessages; - } - - @Override - public String toString() { - return String.format( - "The maximum number of bundle messages: %d have been reached.", currentMaxMessages); - } - } - - /** - * Returned as a future exception when client-side flow control is enforced based on the maximum - * number of unacknowledged in-memory bytes. - */ - final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException { - private final int currentMaxBytes; - - public MaxOutstandingBytesReachedException(int currentMaxBytes) { - this.currentMaxBytes = currentMaxBytes; - } - - public int getCurrentMaxBundleBytes() { - return currentMaxBytes; - } - - @Override - public String toString() { - return String.format( - "The maximum number of bundle bytes: %d have been reached.", currentMaxBytes); - } - } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index 355cfffee265..d9e375b2dedd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -68,8 +69,7 @@ final class PublisherImpl implements Publisher { private final Duration maxBundleDuration; private final boolean hasBundlingBytes; - private final Optional maxOutstandingMessages; - private final Optional maxOutstandingBytes; + private final FlowController.Settings flowControlSettings; private final boolean failOnFlowControlLimits; private final Lock messagesBundleLock; @@ -98,11 +98,9 @@ final class PublisherImpl implements Publisher { maxBundleDuration = builder.bundlingSettings.getDelayThreshold(); hasBundlingBytes = maxBundleBytes > 0; - maxOutstandingMessages = builder.maxOutstandingMessages; - maxOutstandingBytes = builder.maxOutstandingBytes; + flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; - this.flowController = - new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits); + this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); sendBundleDeadline = builder.sendBundleDeadline; @@ -165,13 +163,13 @@ public long getMaxBundleMessages() { } @Override - public Optional getMaxOutstandingMessages() { - return maxOutstandingMessages; + public Optional getMaxOutstandingElementCount() { + return flowControlSettings.getMaxOutstandingElementCount(); } @Override - public Optional getMaxOutstandingBytes() { - return maxOutstandingBytes; + public Optional getMaxOutstandingRequestBytes() { + return flowControlSettings.getMaxOutstandingRequestBytes(); } @Override @@ -181,12 +179,12 @@ public boolean failOnFlowControlLimits() { /** Whether flow control kicks in on a per outstanding messages basis. */ boolean isPerMessageEnforced() { - return maxOutstandingMessages.isPresent(); + return getMaxOutstandingElementCount().isPresent(); } /** Whether flow control kicks in on a per outstanding bytes basis. */ boolean isPerBytesEnforced() { - return maxOutstandingBytes.isPresent(); + return getMaxOutstandingRequestBytes().isPresent(); } @Override @@ -203,7 +201,7 @@ public ListenableFuture publish(PubsubMessage message) { final int messageSize = message.getSerializedSize(); try { flowController.reserve(1, messageSize); - } catch (CloudPubsubFlowControlException e) { + } catch (FlowController.FlowControlException e) { return Futures.immediateFailedFuture(e); } OutstandingBundle bundleToSend = null; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java index e8f0e63c04d9..d0d3cd4b3e6c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.StatusUtil.isRetryable; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java index cf24e2eca2d5..ba5931b09248 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; @@ -36,9 +37,9 @@ * *

A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver * receiver} to which messages are going to be delivered as soon as they are received by the - * subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link - * AckReply#NACK nacked} at will as they get processed by the receiver. Nacking a - * messages implies a later redelivery of such message. + * subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK + * nacked} at will as they get processed by the receiver. Nacking a messages implies a later + * redelivery of such message. * *

The subscriber handles the ack management, by automatically extending the ack deadline while * the message is being processed, to then issue the ack or nack of such message when the processing @@ -53,38 +54,38 @@ * in memory before the receiver either ack or nack them. * * - *

If no credentials are provided, the {@link Publisher} will use application default - * credentials through {@link GoogleCredentials#getApplicationDefault}. + *

If no credentials are provided, the {@link Publisher} will use application default credentials + * through {@link GoogleCredentials#getApplicationDefault}. * *

For example, a {@link Subscriber} can be constructed and used to receive messages as follows: * - *

- *  MessageReceiver receiver =
- *      message -> {
- *        // ... process message ...
- *        return Futures.immediateFuture(AckReply.ACK);
- *      });
+ * 
{@code
+ * MessageReceiver receiver = new MessageReceiver() {
+ *   @Override
+ *   public ListenableFuture receiveMessage(PubsubMessage message) {
+ *     // ... process message ...
+ *     return Futures.immediateFuture(AckReply.ACK);
+ *   }
+ * }
  *
- *  Subscriber subscriber =
- *      Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
- *          .setMaxBundleAcks(100)
- *          .build();
+ * Subscriber subscriber =
+ *     Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
+ *         .setMaxBundleAcks(100)
+ *         .build();
  *
- *  subscriber.startAsync();
+ * subscriber.startAsync();
  *
- *  ... recommended, listen for fatal errors that break the subscriber streaming ...
- *  subscriber.addListener(
-        new Listener() {
-          @Override
-          public void failed(State from, Throwable failure) {
-            System.out.println("Subscriber faile with error: " + failure);
-          }
-        },
-        Executors.newSingleThreadExecutor());
+ * // ... recommended, listen for fatal errors that break the subscriber streaming ...
+ * subscriber.addListener(new Listener() {
+ *   @Override
+ *   public void failed(State from, Throwable failure) {
+ *     System.out.println("Subscriber faile with error: " + failure);
+ *   }
+ * }, Executors.newSingleThreadExecutor());
  *
- *  ... and when done with the subscriber ...
- *  subscriber.stopAsync();
- * 
+ * // ... and when done with the subscriber ... + * subscriber.stopAsync(); + * }
*/ public interface Subscriber extends Service { String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; @@ -128,14 +129,12 @@ public static enum AckReply { * *

When limits are enforced, no more messages will be dispatched to the {@link * MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window - * management, still some extra bytes could be kept at lower layers. + * management, still some extra bytes could be kept at lower layers. */ - Optional getMaxOutstandingMessages(); + Optional getMaxOutstandingElementCount(); - /** - * Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. - */ - Optional getMaxOutstandingBytes(); + /** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */ + Optional getMaxOutstandingRequestBytes(); /** Builder of {@link Subscriber Subscribers}. */ final class Builder { @@ -143,17 +142,17 @@ final class Builder { private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); String subscription; - Optional credentials; + Optional credentials = Optional.absent(); MessageReceiver receiver; - Duration ackExpirationPadding; + Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; - Optional maxOutstandingMessages; - Optional maxOutstandingBytes; + FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; - Optional executor; - Optional>> channelBuilder; - Optional clock; + Optional executor = Optional.absent(); + Optional>> channelBuilder = + Optional.absent(); + Optional clock = Optional.absent(); /** * Constructs a new {@link Builder}. @@ -170,21 +169,10 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) } Builder(String subscription, MessageReceiver receiver) { - setDefaults(); this.subscription = subscription; this.receiver = receiver; } - private void setDefaults() { - credentials = Optional.absent(); - channelBuilder = Optional.absent(); - ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; - maxOutstandingBytes = Optional.absent(); - maxOutstandingMessages = Optional.absent(); - executor = Optional.absent(); - clock = Optional.absent(); - } - /** * Credentials to authenticate with. * @@ -208,33 +196,9 @@ public Builder setChannelBuilder( return this; } - /** - * Sets the maximum number of outstanding messages; messages delivered to the {@link - * MessageReceiver} that have not been acknowledged or rejected. - * - * @param maxOutstandingMessages must be greater than 0 - */ - public Builder setMaxOutstandingMessages(int maxOutstandingMessages) { - Preconditions.checkArgument( - maxOutstandingMessages > 0, - "maxOutstandingMessages limit is disabled by default, but if set it must be set to a " - + "value greater to 0."); - this.maxOutstandingMessages = Optional.of(maxOutstandingMessages); - return this; - } - - /** - * Sets the maximum number of outstanding bytes; bytes delivered to the {@link MessageReceiver} - * that have not been acknowledged or rejected. - * - * @param maxOutstandingBytes must be greater than 0 - */ - public Builder setMaxOutstandingBytes(int maxOutstandingBytes) { - Preconditions.checkArgument( - maxOutstandingBytes > 0, - "maxOutstandingBytes limit is disabled by default, but if set it must be set to a value " - + "greater than 0."); - this.maxOutstandingBytes = Optional.of(maxOutstandingBytes); + /** Sets the flow control settings. */ + public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { + this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java index 913028931af3..a8b98d6d6707 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; @@ -59,8 +60,7 @@ class SubscriberImpl extends AbstractService implements Subscriber { private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); private final String subscription; - private final Optional maxOutstandingBytes; - private final Optional maxOutstandingMessages; + private final FlowController.Settings flowControlSettings; private final Duration ackExpirationPadding; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = @@ -78,8 +78,7 @@ class SubscriberImpl extends AbstractService implements Subscriber { public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException { receiver = builder.receiver; - maxOutstandingBytes = builder.maxOutstandingBytes; - maxOutstandingMessages = builder.maxOutstandingMessages; + flowControlSettings = builder.flowControlSettings; subscription = builder.subscription; ackExpirationPadding = builder.ackExpirationPadding; streamAckDeadlineSeconds = @@ -88,8 +87,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); - flowController = - new FlowController(builder.maxOutstandingBytes, builder.maxOutstandingBytes, false); + flowController = new FlowController(builder.flowControlSettings, false); numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; executor = @@ -315,12 +313,12 @@ public Duration getAckExpirationPadding() { } @Override - public Optional getMaxOutstandingMessages() { - return maxOutstandingMessages; + public Optional getMaxOutstandingElementCount() { + return flowControlSettings.getMaxOutstandingElementCount(); } @Override - public Optional getMaxOutstandingBytes() { - return maxOutstandingBytes; + public Optional getMaxOutstandingRequestBytes() { + return flowControlSettings.getMaxOutstandingRequestBytes(); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index f0f589410300..6f07c120c66c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -18,14 +18,10 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.cloud.GrpcServiceOptions; import com.google.cloud.Policy; -import com.google.cloud.pubsub.PubSub.PullOption; import com.google.common.base.Function; - import java.io.IOException; import java.io.ObjectInputStream; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.Future; @@ -41,18 +37,12 @@ * processed and the Pub/Sub system can delete it from the subscription; a non-success response * indicates that the Pub/Sub server should resend it (implicit "nack"). * - *

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. - * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, - * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or - * {@link PubSub#ackAsync(String, String, String...)}. + *

In a pull subscription, the subscribing application must pull messages using {@link + * PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}. * - *

{@code Subscription} adds a layer of service-related functionality over - * {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} - * object with the most recent information use {@link #reload} or {@link #reloadAsync}. + *

{@code Subscription} adds a layer of service-related functionality over {@link + * SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} object with + * the most recent information use {@link #reload} or {@link #reloadAsync}. * * @see Pub/Sub Data Model * @see Subscriber Guide diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java index 5bbc4cd161bc..b7ddc5bea16f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -20,10 +20,8 @@ import com.google.cloud.pubsub.spi.v1.SubscriberClient; import com.google.common.base.MoreObjects; - import java.io.Serializable; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a @@ -36,14 +34,8 @@ * processed and the Pub/Sub system can delete it from the subscription; a non-success response * indicates that the Pub/Sub server should resend it (implicit "nack"). * - *

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. - * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, - * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or - * {@link PubSub#ackAsync(String, String, String...)}. + *

In a pull subscription, the subscribing application must pull messages using {@link + * PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}. * * @see Pub/Sub Data Model * @see Subscriber Guide @@ -140,10 +132,10 @@ public abstract static class Builder { * acknowledge the message. After message delivery but before the ack deadline expires and * before the message is acknowledged, it is an outstanding message and will not be delivered * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. - * This value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * as the initial value for the ack deadline, and {@link Subscriber} automatically renews + * unprocessed messages. For push delivery, this value is used to set the request timeout for + * the call to the push endpoint. This value must be between 10 and 600 seconds, if not + * specified, 10 seconds is used. */ @Deprecated public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds); @@ -153,10 +145,10 @@ public abstract static class Builder { * acknowledge the message. After message delivery but before the ack deadline expires and * before the message is acknowledged, it is an outstanding message and will not be delivered * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. - * This value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * as the initial value for the ack deadline. , and {@link Subscriber} automatically renews + * unprocessed messages. For push delivery, this value is used to set the request timeout for + * the call to the push endpoint. This value must be between 10 and 600 seconds, if not + * specified, 10 seconds is used. */ public abstract Builder setAckDeadLineSeconds(int ackDeadLineSeconds); @@ -333,13 +325,13 @@ public PushConfig getPushConfig() { /** * Returns the maximum time after a subscriber receives a message before the subscriber should - * acknowledge the message. After message delivery but before the ack deadline expires and - * before the message is acknowledged, it is an outstanding message and will not be delivered - * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. This - * value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * acknowledge the message. After message delivery but before the ack deadline expires and before + * the message is acknowledged, it is an outstanding message and will not be delivered again + * during that time (on a best-effort basis). For pull subscriptions, this value is used as the + * initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed + * messages. For push delivery, this value is used to set the request timeout for the call to the + * push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is + * used. */ @Deprecated public long ackDeadlineSeconds() { @@ -348,13 +340,13 @@ public long ackDeadlineSeconds() { /** * Returns the maximum time after a subscriber receives a message before the subscriber should - * acknowledge the message. After message delivery but before the ack deadline expires and - * before the message is acknowledged, it is an outstanding message and will not be delivered - * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. This - * value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * acknowledge the message. After message delivery but before the ack deadline expires and before + * the message is acknowledged, it is an outstanding message and will not be delivered again + * during that time (on a best-effort basis). For pull subscriptions, this value is used as the + * initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed + * messages. For push delivery, this value is used to set the request timeout for the call to the + * push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is + * used. */ public long getAckDeadlineSeconds() { return ackDeadlineSeconds; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java deleted file mode 100644 index 7fd9dfd81818..000000000000 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingBytesReachedException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingMessagesReachedException; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FlowController}. */ -@RunWith(JUnit4.class) -public class FlowControllerTest { - - @Test - public void testReserveRelease_ok() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(10), false); - - flowController.reserve(1, 1); - flowController.release(1, 1); - } - - @Test - public void testInvalidArguments() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(10), false); - - flowController.reserve(1, 0); - try { - flowController.reserve(-1, 1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - flowController.reserve(1, -1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - flowController.reserve(0, 1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - } - - @Test - public void testReserveRelease_noLimits_ok() throws Exception { - FlowController flowController = - new FlowController(Optional.absent(), Optional.absent(), false); - - flowController.reserve(1, 1); - flowController.release(1, 1); - } - - @Test - public void testReserveRelease_blockedByNumberOfMessages() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(100), false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfMessages_noBytesLimit() throws Exception { - FlowController flowController = - new FlowController(Optional.of(10), Optional.absent(), false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfBytes() throws Exception { - FlowController flowController = new FlowController(Optional.of(100), Optional.of(10), false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfBytes_noMessagesLimit() throws Exception { - FlowController flowController = - new FlowController(Optional.absent(), Optional.of(10), false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - private static void testBlockingReserveRelease( - final FlowController flowController, final int maxNumMessages, final int maxNumBytes) - throws Exception { - - flowController.reserve(1, 1); - - final SettableFuture permitsReserved = SettableFuture.create(); - Future finished = - Executors.newCachedThreadPool() - .submit( - new Runnable() { - @Override - public void run() { - try { - permitsReserved.set(null); - flowController.reserve(maxNumMessages, maxNumBytes); - } catch (CloudPubsubFlowControlException e) { - throw new AssertionError(e); - } - } - }); - - permitsReserved.get(); - flowController.release(1, 1); - - finished.get(); - } - - @Test - public void testReserveRelease_rejectedByNumberOfMessages() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(100), true); - - testRejectedReserveRelease( - flowController, 10, 10, MaxOutstandingMessagesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfMessages_noBytesLimit() throws Exception { - FlowController flowController = - new FlowController(Optional.of(10), Optional.absent(), true); - - testRejectedReserveRelease( - flowController, 10, 10, MaxOutstandingMessagesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { - FlowController flowController = new FlowController(Optional.of(100), Optional.of(10), true); - - testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfBytes_noMessagesLimit() throws Exception { - FlowController flowController = - new FlowController(Optional.absent(), Optional.of(10), true); - - testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); - } - - private void testRejectedReserveRelease( - FlowController flowController, - int maxNumMessages, - int maxNumBytes, - Class expectedException) - throws CloudPubsubFlowControlException { - - flowController.reserve(1, 1); - - try { - flowController.reserve(maxNumMessages, maxNumBytes); - fail("Should thrown a CloudPubsubFlowControlException"); - } catch (CloudPubsubFlowControlException e) { - assertTrue(expectedException.isInstance(e)); - } - - flowController.release(1, 1); - - flowController.reserve(maxNumMessages, maxNumBytes); - } -} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index 29fbf99be07c..bd906b33ca74 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; +import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; import com.google.cloud.pubsub.Publisher.Builder; import com.google.common.base.Optional; @@ -344,8 +345,11 @@ public void testPublisherGetters() throws Exception { .setDelayThreshold(new Duration(11)) .setElementCountThreshold(12) .build()); - builder.setMaxOutstandingBytes(13); - builder.setMaxOutstandingMessages(14); + builder.setFlowControlSettings( + FlowController.Settings.newBuilder() + .setMaxOutstandingRequestBytes(Optional.of(13)) + .setMaxOutstandingElementCount(Optional.of(14)) + .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); Publisher publisher = builder.build(); @@ -354,8 +358,8 @@ public void testPublisherGetters() throws Exception { assertEquals(10, publisher.getMaxBundleBytes()); assertEquals(new Duration(11), publisher.getMaxBundleDuration()); assertEquals(12, publisher.getMaxBundleMessages()); - assertEquals(Optional.of(13), publisher.getMaxOutstandingBytes()); - assertEquals(Optional.of(14), publisher.getMaxOutstandingMessages()); + assertEquals(Optional.of(13), publisher.getMaxOutstandingRequestBytes()); + assertEquals(Optional.of(14), publisher.getMaxOutstandingElementCount()); assertTrue(publisher.failOnFlowControlLimits()); } @@ -374,8 +378,7 @@ public void testBuilderParametersAndDefaults() { assertEquals( Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.bundlingSettings.getElementCountThreshold().longValue()); - assertEquals(Optional.absent(), builder.maxOutstandingBytes); - assertEquals(Optional.absent(), builder.maxOutstandingMessages); + assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); assertEquals(Optional.absent(), builder.userCredentials); @@ -407,7 +410,10 @@ public void testBuilderInvalidArguments() { } try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setRequestByteThreshold((Long) null) + .build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected @@ -451,7 +457,10 @@ public void testBuilderInvalidArguments() { Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build()); try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold((Long) null) + .build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected @@ -471,29 +480,53 @@ public void testBuilderInvalidArguments() { // Expected } - builder.setMaxOutstandingBytes(1); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingRequestBytes(Optional.of(1)) + .build()); try { - builder.setMaxOutstandingBytes(0); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingRequestBytes(Optional.of(0)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxOutstandingBytes(-1); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingRequestBytes(Optional.of(-1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setMaxOutstandingMessages(1); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingElementCount(Optional.of(1)) + .build()); try { - builder.setMaxOutstandingMessages(0); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingElementCount(Optional.of(0)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxOutstandingMessages(-1); + builder.setFlowControlSettings( + FlowController.Settings.DEFAULT + .toBuilder() + .setMaxOutstandingElementCount(Optional.of(-1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected