From 5e3b897e6b748bb420cd31f2450be746ae783616 Mon Sep 17 00:00:00 2001 From: Chris Sainty Date: Wed, 5 Sep 2018 20:09:40 +0200 Subject: [PATCH] pubsub: clean up after extension gives up (#3633) --- .../cloud/pubsub/v1/MessageDispatcher.java | 21 ++++++++++--------- .../pubsub/v1/MessageDispatcherTest.java | 13 +++++++++++- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index e41a2ed4f794..aad9446f06eb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -78,7 +78,7 @@ class MessageDispatcher { private final MessageWaiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. - private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); + private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); @@ -141,7 +141,7 @@ private class AckHandler implements FutureCallback { } private void onBoth(LinkedBlockingQueue destination) { - pendingMessages.remove(ackId); + pendingMessages.remove(this); destination.add(ackId); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); @@ -329,17 +329,15 @@ public void processReceivedMessages(List messages, Runnable don } messagesWaiter.incrementPendingMessages(messages.size()); - Instant totalExpiration = now().plus(maxAckExtensionPeriod); - for (ReceivedMessage message : messages) { - pendingReceipts.add(message.getAckId()); - pendingMessages.put(message.getAckId(), totalExpiration); - } + Instant totalExpiration = now().plus(maxAckExtensionPeriod); OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback); for (ReceivedMessage message : messages) { AckHandler ackHandler = new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); outstandingBatch.addMessage(message, ackHandler); + pendingReceipts.add(message.getAckId()); + pendingMessages.put(ackHandler, totalExpiration); } synchronized (outstandingMessageBatches) { outstandingMessageBatches.add(outstandingBatch); @@ -436,10 +434,10 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); int count = 0; - Iterator> it = pendingMessages.entrySet().iterator(); + Iterator> it = pendingMessages.entrySet().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); - String ackId = entry.getKey(); + Map.Entry entry = it.next(); + String ackId = entry.getKey().ackId; Instant totalExpiration = entry.getValue(); // TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull, // since one modack RPC only takes one expiration. @@ -455,6 +453,9 @@ void extendDeadlines() { int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS)); modacks.add(new PendingModifyAckDeadline(sec, ackId)); count++; + } else { + flowController.release(1, entry.getKey().outstandingBytes); + messagesWaiter.incrementPendingMessages(-1); } } modacks.add(modack); diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 2cd6997f6436..75296dd89c87 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -56,6 +56,7 @@ public void run() { private List sentAcks; private List sentModAcks; private FakeClock clock; + private FlowController flowController; @AutoValue abstract static class ModAckItem { @@ -101,6 +102,12 @@ public void sendAckOperations( systemExecutor.shutdownNow(); clock = new FakeClock(); + flowController = + new FlowController( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .build()); dispatcher = new MessageDispatcher( @@ -109,7 +116,7 @@ public void sendAckOperations( Duration.ofSeconds(5), Duration.ofMinutes(60), new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1), - new FlowController(FlowControlSettings.newBuilder().build()), + flowController, new LinkedList(), MoreExecutors.directExecutor(), systemExecutor, @@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception { clock.advance(1, TimeUnit.DAYS); dispatcher.extendDeadlines(); assertThat(sentModAcks).isEmpty(); + + // We should be able to reserve another item in the flow controller and not block shutdown + flowController.reserve(1, 0); + dispatcher.stop(); } @Test