Skip to content

Commit

Permalink
make pubsub work with new gax (#1505)
Browse files Browse the repository at this point in the history
* make pubsub work with new gax

update documentation links too
  • Loading branch information
pongad committed Jan 6, 2017
1 parent 6a4bc9d commit db4908e
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 531 deletions.
2 changes: 1 addition & 1 deletion google-cloud-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.25</version>
<version>0.0.27</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@

package com.google.cloud.pubsub;

import com.google.auth.Credentials;
import com.google.api.gax.bundling.FlowController;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -287,7 +284,7 @@ public void run() {
}
try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (CloudPubsubFlowControlException unexpectedException) {
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsub.StatusUtil.isRetryable;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
Expand All @@ -39,7 +40,6 @@
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -86,8 +87,8 @@ public interface Publisher {
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -129,21 +130,22 @@ public interface Publisher {
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
* See {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingMessages();
Optional<Integer> getMaxOutstandingElementCount();

/**
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
* {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingBytes();
Optional<Integer> getMaxOutstandingRequestBytes();

/**
* Whether to block publish calls when reaching flow control limits (see {@link
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
*
* <p>If set to false, a publish call will fail with either {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
* appropriate, when flow control limits are reached.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
* reached.
*/
boolean failOnFlowControlLimits();

Expand All @@ -164,24 +166,24 @@ final class Builder {
String topic;

// Bundling options
BundlingSettings bundlingSettings;
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
boolean failOnFlowControlLimits;
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
boolean failOnFlowControlLimits = false;

// Send bundle deadline
Duration sendBundleDeadline;
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;

// RPC options
Duration requestTimeout;
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;

// Channels and credentials
Optional<Credentials> userCredentials;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
Optional<Credentials> userCredentials = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();

Optional<ScheduledExecutorService> executor;
Optional<ScheduledExecutorService> executor = Optional.absent();

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(String topic) {
Expand All @@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) {

Builder(String topic) {
this.topic = Preconditions.checkNotNull(topic);
setDefaults();
}

private void setDefaults() {
userCredentials = Optional.absent();
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
executor = Optional.absent();
}

/**
Expand Down Expand Up @@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {

// Flow control options

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxOutstandingMessages = Optional.of(messages);
return this;
}

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxOutstandingBytes = Optional.of(bytes);
/** Sets the flow control settings. */
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}

/**
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as
* appropriate.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException} as appropriate.
*
* <p>If set to false, then publish operations will block the current thread until the
* outstanding requests go under the limits.
Expand Down Expand Up @@ -306,51 +287,4 @@ public Publisher build() throws IOException {
return new PublisherImpl(this);
}
}

/** Base exception that signals a flow control state. */
abstract class CloudPubsubFlowControlException extends Exception {}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of outstanding in-memory messages.
*/
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxMessages;

public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
this.currentMaxMessages = currentMaxMessages;
}

public int getCurrentMaxBundleMessages() {
return currentMaxMessages;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
}
}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of unacknowledged in-memory bytes.
*/
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxBytes;

public MaxOutstandingBytesReachedException(int currentMaxBytes) {
this.currentMaxBytes = currentMaxBytes;
}

public int getCurrentMaxBundleBytes() {
return currentMaxBytes;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,8 +69,7 @@ final class PublisherImpl implements Publisher {
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;

private final Optional<Integer> maxOutstandingMessages;
private final Optional<Integer> maxOutstandingBytes;
private final FlowController.Settings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final Lock messagesBundleLock;
Expand Down Expand Up @@ -98,11 +98,9 @@ final class PublisherImpl implements Publisher {
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
hasBundlingBytes = maxBundleBytes > 0;

maxOutstandingMessages = builder.maxOutstandingMessages;
maxOutstandingBytes = builder.maxOutstandingBytes;
flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController =
new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits);
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

sendBundleDeadline = builder.sendBundleDeadline;

Expand Down Expand Up @@ -165,13 +163,13 @@ public long getMaxBundleMessages() {
}

@Override
public Optional<Integer> getMaxOutstandingMessages() {
return maxOutstandingMessages;
public Optional<Integer> getMaxOutstandingElementCount() {
return flowControlSettings.getMaxOutstandingElementCount();
}

@Override
public Optional<Integer> getMaxOutstandingBytes() {
return maxOutstandingBytes;
public Optional<Integer> getMaxOutstandingRequestBytes() {
return flowControlSettings.getMaxOutstandingRequestBytes();
}

@Override
Expand All @@ -181,12 +179,12 @@ public boolean failOnFlowControlLimits() {

/** Whether flow control kicks in on a per outstanding messages basis. */
boolean isPerMessageEnforced() {
return maxOutstandingMessages.isPresent();
return getMaxOutstandingElementCount().isPresent();
}

/** Whether flow control kicks in on a per outstanding bytes basis. */
boolean isPerBytesEnforced() {
return maxOutstandingBytes.isPresent();
return getMaxOutstandingRequestBytes().isPresent();
}

@Override
Expand All @@ -203,7 +201,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {
final int messageSize = message.getSerializedSize();
try {
flowController.reserve(1, messageSize);
} catch (CloudPubsubFlowControlException e) {
} catch (FlowController.FlowControlException e) {
return Futures.immediateFailedFuture(e);
}
OutstandingBundle bundleToSend = null;
Expand Down
Loading

0 comments on commit db4908e

Please sign in to comment.