Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make pubsub work with new gax #1505

Merged
merged 3 commits into from
Jan 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google-cloud-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.25</version>
<version>0.0.27</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@

package com.google.cloud.pubsub;

import com.google.auth.Credentials;
import com.google.api.gax.bundling.FlowController;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -287,7 +284,7 @@ public void run() {
}
try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (CloudPubsubFlowControlException unexpectedException) {
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsub.StatusUtil.isRetryable;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
Expand All @@ -39,7 +40,6 @@
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -86,8 +87,8 @@ public interface Publisher {
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -129,21 +130,22 @@ public interface Publisher {
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
* See {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingMessages();
Optional<Integer> getMaxOutstandingElementCount();

/**
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
* {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingBytes();
Optional<Integer> getMaxOutstandingRequestBytes();

/**
* Whether to block publish calls when reaching flow control limits (see {@link
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
*
* <p>If set to false, a publish call will fail with either {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
* appropriate, when flow control limits are reached.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
* reached.
*/
boolean failOnFlowControlLimits();

Expand All @@ -164,24 +166,24 @@ final class Builder {
String topic;

// Bundling options
BundlingSettings bundlingSettings;
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
boolean failOnFlowControlLimits;
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
boolean failOnFlowControlLimits = false;

// Send bundle deadline
Duration sendBundleDeadline;
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;

// RPC options
Duration requestTimeout;
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;

// Channels and credentials
Optional<Credentials> userCredentials;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
Optional<Credentials> userCredentials = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();

Optional<ScheduledExecutorService> executor;
Optional<ScheduledExecutorService> executor = Optional.absent();

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(String topic) {
Expand All @@ -190,19 +192,6 @@ public static Builder newBuilder(String topic) {

Builder(String topic) {
this.topic = Preconditions.checkNotNull(topic);
setDefaults();
}

private void setDefaults() {
userCredentials = Optional.absent();
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
executor = Optional.absent();
}

/**
Expand Down Expand Up @@ -254,24 +243,16 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {

// Flow control options

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxOutstandingMessages = Optional.of(messages);
return this;
}

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxOutstandingBytes = Optional.of(bytes);
/** Sets the flow control settings. */
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}

/**
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as
* appropriate.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException} as appropriate.
*
* <p>If set to false, then publish operations will block the current thread until the
* outstanding requests go under the limits.
Expand Down Expand Up @@ -306,51 +287,4 @@ public Publisher build() throws IOException {
return new PublisherImpl(this);
}
}

/** Base exception that signals a flow control state. */
abstract class CloudPubsubFlowControlException extends Exception {}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of outstanding in-memory messages.
*/
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxMessages;

public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
this.currentMaxMessages = currentMaxMessages;
}

public int getCurrentMaxBundleMessages() {
return currentMaxMessages;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
}
}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of unacknowledged in-memory bytes.
*/
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxBytes;

public MaxOutstandingBytesReachedException(int currentMaxBytes) {
this.currentMaxBytes = currentMaxBytes;
}

public int getCurrentMaxBundleBytes() {
return currentMaxBytes;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,8 +69,7 @@ final class PublisherImpl implements Publisher {
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;

private final Optional<Integer> maxOutstandingMessages;
private final Optional<Integer> maxOutstandingBytes;
private final FlowController.Settings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final Lock messagesBundleLock;
Expand Down Expand Up @@ -98,11 +98,9 @@ final class PublisherImpl implements Publisher {
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
hasBundlingBytes = maxBundleBytes > 0;

maxOutstandingMessages = builder.maxOutstandingMessages;
maxOutstandingBytes = builder.maxOutstandingBytes;
flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController =
new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits);
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

sendBundleDeadline = builder.sendBundleDeadline;

Expand Down Expand Up @@ -165,13 +163,13 @@ public long getMaxBundleMessages() {
}

@Override
public Optional<Integer> getMaxOutstandingMessages() {
return maxOutstandingMessages;
public Optional<Integer> getMaxOutstandingElementCount() {
return flowControlSettings.getMaxOutstandingElementCount();
}

@Override
public Optional<Integer> getMaxOutstandingBytes() {
return maxOutstandingBytes;
public Optional<Integer> getMaxOutstandingRequestBytes() {
return flowControlSettings.getMaxOutstandingRequestBytes();
}

@Override
Expand All @@ -181,12 +179,12 @@ public boolean failOnFlowControlLimits() {

/** Whether flow control kicks in on a per outstanding messages basis. */
boolean isPerMessageEnforced() {
return maxOutstandingMessages.isPresent();
return getMaxOutstandingElementCount().isPresent();
}

/** Whether flow control kicks in on a per outstanding bytes basis. */
boolean isPerBytesEnforced() {
return maxOutstandingBytes.isPresent();
return getMaxOutstandingRequestBytes().isPresent();
}

@Override
Expand All @@ -203,7 +201,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {
final int messageSize = message.getSerializedSize();
try {
flowController.reserve(1, messageSize);
} catch (CloudPubsubFlowControlException e) {
} catch (FlowController.FlowControlException e) {
return Futures.immediateFailedFuture(e);
}
OutstandingBundle bundleToSend = null;
Expand Down
Loading