Skip to content

Commit

Permalink
pubsub: reject expired and duplicate messages (#3743)
Browse files Browse the repository at this point in the history
* pubsub: reject expired and duplicate messages

* add some lag time before rejecting expired messages
  • Loading branch information
pongad committed Oct 18, 2018
1 parent c3aedea commit db9d244
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -133,16 +133,25 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
private final String ackId;
private final int outstandingBytes;
private final long receivedTimeMillis;
private final Instant totalExpiration;

AckHandler(String ackId, int outstandingBytes) {
AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
this.ackId = ackId;
this.outstandingBytes = outstandingBytes;
receivedTimeMillis = clock.millisTime();
this.receivedTimeMillis = clock.millisTime();
this.totalExpiration = totalExpiration;
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(this);
destination.add(ackId);
/** Stop extending deadlines for this message and free flow control. */
private void forget() {
if (pendingMessages.remove(ackId) == null) {
/*
* We're forgetting the message for the second time. Probably because we ran out of total
* expiration, forget the message, then the user finishes working on the message, and forget
* again. Turn the second forget into a no-op so we don't free twice.
*/
return;
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
Expand All @@ -154,7 +163,8 @@ public void onFailure(Throwable t) {
Level.WARNING,
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
t);
onBoth(pendingNacks);
pendingNacks.add(ackId);
forget();
}

@Override
Expand All @@ -174,7 +184,8 @@ public void onSuccess(AckReply reply) {
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
onBoth(destination);
destination.add(ackId);
forget();
}
}

Expand Down Expand Up @@ -327,18 +338,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
doneCallback.run();
return;
}
messagesWaiter.incrementPendingMessages(messages.size());


Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
new AckHandler(
message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration);
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null){
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the previously-mapped element.
// If the previous element is not null, we already have the message and the new one is definitely a duplicate.
// Don't nack this, because that'd also nack the one we already have in queue.

// TODO(pongad): We could update the total expiration time, but I'm not 100% sure how that plays with
// various resources. Think about this more.
continue;
}
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
}

if (outstandingBatch.messages.isEmpty()) {
doneCallback.run();
return;
}

messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
}
Expand Down Expand Up @@ -398,6 +423,14 @@ public void nack() {
@Override
public void run() {
try {
if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) {
// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
return;
}

receiver.receiveMessage(message, consumer);
} catch (Exception e) {
response.setException(e);
Expand Down Expand Up @@ -433,35 +466,26 @@ void extendDeadlines() {
Instant now = now();
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
// Whenever we delete polling pull, we should also delete PendingModifyAckDeadline,
// and just construct StreamingPullRequest directly.
for (Map.Entry<String, AckHandler> entry : pendingMessages.entrySet()) {
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue().totalExpiration;
if (totalExpiration.isAfter(extendTo)) {
modack.ackIds.add(ackId);
count++;
continue;
}
it.remove();

// forget removes from pendingMessages; this is OK, concurrent maps can
// handle concurrent iterations and modifications.
entry.getValue().forget();
if (totalExpiration.isAfter(now)) {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size());
modacks.add(modack);
logger.log(Level.FINER, "Sending {0} modacks", count);

List<String> acksToSend = Collections.<String>emptyList();
List<String> acksToSend = Collections.emptyList();
ackProcessor.sendAckOperations(acksToSend, modacks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public void close() throws IOException {
numChannels = builder.parallelPullCount;
channels = new ArrayList<>(numChannels);
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);

// We regularly look up the distribution for a good subscription deadline.
// So we seed the distribution with something reasonable to start with.
// Distribution is percentile-based, so this value will eventually lose importance.
ackLatencyDistribution.record(60);
}

/**
Expand Down

0 comments on commit db9d244

Please sign in to comment.