Skip to content

Commit

Permalink
pubsub: clean up after extension gives up (googleapis#3633)
Browse files Browse the repository at this point in the history
  • Loading branch information
csainty authored and pongad committed Sep 5, 2018
1 parent f4bc56d commit 5e3b897
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -141,7 +141,7 @@ private class AckHandler implements FutureCallback<AckReply> {
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(ackId);
pendingMessages.remove(this);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
Expand Down Expand Up @@ -329,17 +329,15 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
}
messagesWaiter.incrementPendingMessages(messages.size());

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
for (ReceivedMessage message : messages) {
pendingReceipts.add(message.getAckId());
pendingMessages.put(message.getAckId(), totalExpiration);
}

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
}
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
Expand Down Expand Up @@ -436,10 +434,10 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Instant> entry = it.next();
String ackId = entry.getKey();
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
Expand All @@ -455,6 +453,9 @@ void extendDeadlines() {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
modacks.add(modack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void run() {
private List<String> sentAcks;
private List<ModAckItem> sentModAcks;
private FakeClock clock;
private FlowController flowController;

@AutoValue
abstract static class ModAckItem {
Expand Down Expand Up @@ -101,6 +102,12 @@ public void sendAckOperations(
systemExecutor.shutdownNow();

clock = new FakeClock();
flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build());

dispatcher =
new MessageDispatcher(
Expand All @@ -109,7 +116,7 @@ public void sendAckOperations(
Duration.ofSeconds(5),
Duration.ofMinutes(60),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
new FlowController(FlowControlSettings.newBuilder().build()),
flowController,
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
Expand Down Expand Up @@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception {
clock.advance(1, TimeUnit.DAYS);
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
flowController.reserve(1, 0);
dispatcher.stop();
}

@Test
Expand Down

0 comments on commit 5e3b897

Please sign in to comment.