Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: reject expired and duplicate messages #3743

Merged
merged 4 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -133,16 +133,25 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
private final String ackId;
private final int outstandingBytes;
private final long receivedTimeMillis;
private final Instant totalExpiration;

AckHandler(String ackId, int outstandingBytes) {
AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
this.ackId = ackId;
this.outstandingBytes = outstandingBytes;
receivedTimeMillis = clock.millisTime();
this.receivedTimeMillis = clock.millisTime();
this.totalExpiration = totalExpiration;
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(this);
destination.add(ackId);
/** Stop extending deadlines for this message and free flow control. */
private void forget() {
if (pendingMessages.remove(ackId) == null) {
/*
* We're forgetting the message for the second time. Probably because we ran out of total
* expiration, forget the message, then the user finishes working on the message, and forget
* again. Turn the second forget into a no-op so we don't free twice.
*/
return;
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
Expand All @@ -154,7 +163,8 @@ public void onFailure(Throwable t) {
Level.WARNING,
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
t);
onBoth(pendingNacks);
pendingNacks.add(ackId);
forget();
}

@Override
Expand All @@ -174,7 +184,8 @@ public void onSuccess(AckReply reply) {
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
onBoth(destination);
destination.add(ackId);
forget();
}
}

Expand Down Expand Up @@ -327,18 +338,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
doneCallback.run();
return;
}
messagesWaiter.incrementPendingMessages(messages.size());


Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
new AckHandler(
message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration);
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null){
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the previously-mapped element.
// If the previous element is not null, we already have the message and the new one is definitely a duplicate.
// Don't nack this, because that'd also nack the one we already have in queue.

// TODO(pongad): We could update the total expiration time, but I'm not 100% sure how that plays with
// various resources. Think about this more.
continue;
}
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
}

if (outstandingBatch.messages.isEmpty()) {
doneCallback.run();
return;
}

messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
}
Expand Down Expand Up @@ -398,6 +423,14 @@ public void nack() {
@Override
public void run() {
try {
if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) {

This comment was marked as spam.

This comment was marked as spam.

// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

ackHandler.forget();
return;
}

receiver.receiveMessage(message, consumer);
} catch (Exception e) {
response.setException(e);
Expand Down Expand Up @@ -433,35 +466,26 @@ void extendDeadlines() {
Instant now = now();
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
// Whenever we delete polling pull, we should also delete PendingModifyAckDeadline,
// and just construct StreamingPullRequest directly.
for (Map.Entry<String, AckHandler> entry : pendingMessages.entrySet()) {
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue().totalExpiration;
if (totalExpiration.isAfter(extendTo)) {
modack.ackIds.add(ackId);
count++;
continue;
}
it.remove();

// forget removes from pendingMessages; this is OK, concurrent maps can
// handle concurrent iterations and modifications.
entry.getValue().forget();

This comment was marked as spam.

This comment was marked as spam.

if (totalExpiration.isAfter(now)) {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size());
modacks.add(modack);
logger.log(Level.FINER, "Sending {0} modacks", count);

List<String> acksToSend = Collections.<String>emptyList();
List<String> acksToSend = Collections.emptyList();
ackProcessor.sendAckOperations(acksToSend, modacks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public void close() throws IOException {
numChannels = builder.parallelPullCount;
channels = new ArrayList<>(numChannels);
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);

// We regularly look up the distribution for a good subscription deadline.
// So we seed the distribution with something reasonable to start with.
// Distribution is percentile-based, so this value will eventually lose importance.
ackLatencyDistribution.record(60);
}

/**
Expand Down