Skip to content

Commit

Permalink
Fix PubSub Iterator pullAsync: add callback to PubSubRpc.pull
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 9, 2016
1 parent 2fd2d56 commit 73f20de
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand All @@ -27,6 +27,7 @@
import com.google.cloud.Page;
import com.google.cloud.PageImpl;
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -476,15 +477,24 @@ public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, in
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.build();
Future<PullResponse> response = rpc.pull(request);
return lazyTransform(response, new Function<PullResponse, Iterator<ReceivedMessage>>() {
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse pullResponse) {
// Add all received messages to the automatic ack deadline renewer
List<String> ackIds = Lists.transform(pullResponse.getReceivedMessagesList(),
public void success(PullResponse response) {
List<String> ackIds = Lists.transform(response.getReceivedMessagesList(),
MESSAGE_TO_ACK_ID_FUNCTION);
ackDeadlineRenewer.add(subscription, ackIds);
return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(),
}

@Override
public void failure(Throwable error) {
// ignore
}
});
return lazyTransform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse response) {
return Iterators.transform(response.getReceivedMessagesList().iterator(),
new Function<com.google.pubsub.v1.ReceivedMessage, ReceivedMessage>() {
@Override
public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -63,8 +64,11 @@

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DefaultPubSubRpc implements PubSubRpc {

Expand All @@ -89,6 +93,56 @@ protected ExecutorFactory executorFactory() {
}
}

private static final class PullFutureImpl implements PullFuture {

private final ListenableFuture<PullResponse> delegate;

PullFutureImpl(ListenableFuture<PullResponse> delegate) {
this.delegate = delegate;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public PullResponse get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public PullResponse get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

@Override
public void addCallback(final PullCallback callback) {
Futures.addCallback(delegate, new FutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse result) {
callback.success(result);
}

@Override
public void onFailure(Throwable error) {
callback.failure(error);
}
});
}
}

public DefaultPubSubRpc(PubSubOptions options) throws IOException {
executorFactory = new InternalPubSubOptions(options).executorFactory();
executor = executorFactory.get();
Expand Down Expand Up @@ -136,13 +190,13 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder);
}

private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
int... returnNullOn) {
private static <V> ListenableFuture<V> translate(ListenableFuture<V> from,
final boolean idempotent, int... returnNullOn) {
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
for (int value : returnNullOn) {
returnNullOnSet.add(value);
}
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
Expand Down Expand Up @@ -224,8 +278,8 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
}

@Override
public Future<PullResponse> pull(PullRequest request) {
return translate(subscriberApi.pullCallable().futureCall(request), false);
public PullFuture pull(PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@

public interface PubSubRpc extends AutoCloseable {

/**
* A callback that can be registered to {@link PullFuture} objects. Objects of this class allow
* to asynchronously react to the success or failure of a pull RPC.
*/
interface PullCallback {

/**
* This method is invoked with the result of a {@link PullFuture} when it was successful.
*
* @param response the pull response
*/
void success(PullResponse response);

/**
* This method is invoked when the {@link PullFuture} failed or was cancelled.
*
* @param error the execption that caused the {@link PullFuture} to fail
*/
void failure(Throwable error);
}

/**
* A {@link Future} implementation for pull RPCs. This class also allows users to register
* callbacks via {@link #addCallback(PullCallback)}.
*/
interface PullFuture extends Future<PullResponse> {

/**
* Registers a callback to be run on the given executor. The listener will run when the pull
* future completed its computation or, if the computation is already complete, immediately.
* There is no guaranteed ordering of execution of callbacks.
*
* <p>Registered callbacks are run using the same thread that run the RPC call. Only lightweight
* callbacks should be registered via this method.
*/
void addCallback(final PullCallback callback);
}

// in all cases root cause of ExecutionException is PubSubException
Future<Topic> create(Topic topic);

Expand All @@ -66,7 +104,7 @@ public interface PubSubRpc extends AutoCloseable {

Future<Empty> acknowledge(AcknowledgeRequest request);

Future<PullResponse> pull(PullRequest request);
PullFuture pull(PullRequest request);

Future<Empty> modify(ModifyPushConfigRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.client.util.Lists;
import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import org.junit.Ignore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSub.ListOption;
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
import com.google.cloud.pubsub.spi.PubSubRpcFactory;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
Expand All @@ -55,13 +58,15 @@
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;

import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1229,7 +1234,7 @@ public void testListTopicSubscriptionsAsyncWithOptions()
}

@Test
public void testPullMessages() {
public void testPullMessages() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
Expand All @@ -1243,10 +1248,16 @@ public void testPullMessages() {
.addReceivedMessages(MESSAGE_PB1)
.addReceivedMessages(MESSAGE_PB2)
.build();
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
Capture<PullCallback> callback = Capture.newInstance();
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.capture(callback));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andReturn(response);
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
EasyMock.replay(pubsubRpcMock, renewerMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
Iterator<ReceivedMessage> messageIterator = pubsub.pull(SUBSCRIPTION, 42);
callback.getValue().success(response);
EasyMock.reset(renewerMock);
for (ReceivedMessage message : messageList) {
renewerMock.remove(SUBSCRIPTION, message.ackId());
Expand All @@ -1256,6 +1267,7 @@ public void testPullMessages() {
while (messageIterator.hasNext()) {
messageIterator.next();
}
EasyMock.verify(futureMock);
}

@Test
Expand All @@ -1273,10 +1285,16 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
.addReceivedMessages(MESSAGE_PB1)
.addReceivedMessages(MESSAGE_PB2)
.build();
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
Capture<PullCallback> callback = Capture.newInstance();
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.capture(callback));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andReturn(response);
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
EasyMock.replay(pubsubRpcMock, renewerMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
Iterator<ReceivedMessage> messageIterator = pubsub.pullAsync(SUBSCRIPTION, 42).get();
callback.getValue().success(response);
EasyMock.reset(renewerMock);
for (ReceivedMessage message : messageList) {
renewerMock.remove(SUBSCRIPTION, message.ackId());
Expand All @@ -1286,6 +1304,55 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
while (messageIterator.hasNext()) {
messageIterator.next();
}
EasyMock.verify(futureMock);
}

@Test
public void testPullMessagesError() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
try {
pubsub.pull(SUBSCRIPTION, 42);
fail("Expected PubSubException");
} catch (PubSubException ex) {
assertSame(exception, ex);
}
EasyMock.verify(futureMock);
}

@Test
public void testPullMessagesAsyncError() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
try {
pubsub.pullAsync(SUBSCRIPTION, 42).get();
fail("Expected ExecutionException");
} catch (ExecutionException ex) {
assertSame(exception, ex.getCause());
}
EasyMock.verify(futureMock);
}

@Test
Expand Down

0 comments on commit 73f20de

Please sign in to comment.