Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement modifyAckDeadline methods, add javadoc and tests #1022

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,78 @@ interface MessageConsumer extends AutoCloseable {

Future<Void> nackAsync(String subscription, Iterable<String> ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack(String, String, String...)}).
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackId the ack id of the first message for which the acknowledge deadline must be
* modified
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
* @throws PubSubException upon failure, or if the subscription was not found
*/
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
String... ackIds);

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
* a {@code Future} object that can be used to wait for the modify operation to be completed.
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackId the ack id of the first message for which the acknowledge deadline must be
* modified
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
*/
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
String ackId, String... ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack(String, Iterable)}).
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
* @throws PubSubException upon failure, or if the subscription was not found
*/
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable<String> ackIds);

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
* a {@code Future} object that can be used to wait for the modify operation to be completed.
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
*/
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand Down Expand Up @@ -47,6 +47,7 @@
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
Expand Down Expand Up @@ -504,25 +505,30 @@ public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
@Override
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
String... ackIds) {

get(modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds)));
}

@Override
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
String ackId, String... ackIds) {
return null;
return modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds));
}

@Override
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds) {

get(modifyAckDeadlineAsync(subscription, deadline, unit, ackIds));
}

@Override
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds) {
return null;
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
}

static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import org.joda.time.Duration;

import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSubOptions;

import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -31,10 +35,6 @@
import java.util.List;
import java.util.UUID;

import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

/**
* A class that runs a Pubsub emulator instance for use in tests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void startServer() throws IOException, InterruptedException {

@AfterClass
public static void stopServer() throws Exception {
pubsub.options().rpc().close();
pubsub.close();
pubsubHelper.reset();
pubsubHelper.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class PubSubImplTest {

Expand Down Expand Up @@ -1187,6 +1189,100 @@ public void testListTopicSubscriptionsAsyncWithOptions()
Iterables.toArray(page.values(), SubscriptionId.class));
}

@Test
public void testModifyAckDeadlineOneMessage() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
}

@Test
public void testModifyAckDeadlineOneMessageAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future =
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineMoreMessages() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
}

@Test
public void testModifyAckDeadlineMoreMessagesAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future =
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
}

@Test
public void testModifyAckDeadlineMessageListAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
assertNull(future.get());
}

@Test
public void testClose() throws Exception {
pubsub = options.service();
Expand Down