diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index bb50e18813de..79aa6b71a94a 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -78,7 +78,7 @@ class MessageDispatcher { private final MessageWaiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. - private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); + private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); @@ -133,16 +133,25 @@ private class AckHandler implements ApiFutureCallback { private final String ackId; private final int outstandingBytes; private final long receivedTimeMillis; + private final Instant totalExpiration; - AckHandler(String ackId, int outstandingBytes) { + AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; this.outstandingBytes = outstandingBytes; - receivedTimeMillis = clock.millisTime(); + this.receivedTimeMillis = clock.millisTime(); + this.totalExpiration = totalExpiration; } - private void onBoth(LinkedBlockingQueue destination) { - pendingMessages.remove(this); - destination.add(ackId); + /** Stop extending deadlines for this message and free flow control. */ + private void forget() { + if (pendingMessages.remove(ackId) == null) { + /* + * We're forgetting the message for the second time. Probably because we ran out of total + * expiration, forget the message, then the user finishes working on the message, and forget + * again. Turn the second forget into a no-op so we don't free twice. + */ + return; + } flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); processOutstandingBatches(); @@ -154,7 +163,8 @@ public void onFailure(Throwable t) { Level.WARNING, "MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.", t); - onBoth(pendingNacks); + pendingNacks.add(ackId); + forget(); } @Override @@ -174,7 +184,8 @@ public void onSuccess(AckReply reply) { default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } - onBoth(destination); + destination.add(ackId); + forget(); } } @@ -327,18 +338,32 @@ public void processReceivedMessages(List messages, Runnable don doneCallback.run(); return; } - messagesWaiter.incrementPendingMessages(messages.size()); - Instant totalExpiration = now().plus(maxAckExtensionPeriod); OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback); for (ReceivedMessage message : messages) { AckHandler ackHandler = - new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); + new AckHandler( + message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration); + if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null){ + // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the previously-mapped element. + // If the previous element is not null, we already have the message and the new one is definitely a duplicate. + // Don't nack this, because that'd also nack the one we already have in queue. + + // TODO(pongad): We could update the total expiration time, but I'm not 100% sure how that plays with + // various resources. Think about this more. + continue; + } outstandingBatch.addMessage(message, ackHandler); pendingReceipts.add(message.getAckId()); - pendingMessages.put(ackHandler, totalExpiration); } + + if (outstandingBatch.messages.isEmpty()) { + doneCallback.run(); + return; + } + + messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size()); synchronized (outstandingMessageBatches) { outstandingMessageBatches.add(outstandingBatch); } @@ -398,6 +423,14 @@ public void nack() { @Override public void run() { try { + if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) { + // Message expired while waiting. We don't extend these messages anymore, + // so it was probably sent to someone else. Don't work on it. + // Don't nack it either, because we'd be nacking someone else's message. + ackHandler.forget(); + return; + } + receiver.receiveMessage(message, consumer); } catch (Exception e) { response.setException(e); @@ -433,35 +466,26 @@ void extendDeadlines() { Instant now = now(); Instant extendTo = now.plusSeconds(extendSeconds); - int count = 0; - Iterator> it = pendingMessages.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - String ackId = entry.getKey().ackId; - Instant totalExpiration = entry.getValue(); - // TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull, - // since one modack RPC only takes one expiration. - // Whenever we delete polling pull, we should also delete PendingModifyAckDeadline, - // and just construct StreamingPullRequest directly. + for (Map.Entry entry : pendingMessages.entrySet()) { + String ackId = entry.getKey(); + Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { modack.ackIds.add(ackId); - count++; continue; } - it.remove(); + + // forget removes from pendingMessages; this is OK, concurrent maps can + // handle concurrent iterations and modifications. + entry.getValue().forget(); if (totalExpiration.isAfter(now)) { int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS)); modacks.add(new PendingModifyAckDeadline(sec, ackId)); - count++; - } else { - flowController.release(1, entry.getKey().outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); } } + logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size()); modacks.add(modack); - logger.log(Level.FINER, "Sending {0} modacks", count); - List acksToSend = Collections.emptyList(); + List acksToSend = Collections.emptyList(); ackProcessor.sendAckOperations(acksToSend, modacks); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index cf35733bda8b..f38d695791ca 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -188,6 +188,11 @@ public void close() throws IOException { numChannels = builder.parallelPullCount; channels = new ArrayList<>(numChannels); streamingSubscriberConnections = new ArrayList(numChannels); + + // We regularly look up the distribution for a good subscription deadline. + // So we seed the distribution with something reasonable to start with. + // Distribution is percentile-based, so this value will eventually lose importance. + ackLatencyDistribution.record(60); } /**