Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: reject expired and duplicate messages #3743

Merged
merged 4 commits into from
Oct 18, 2018
Merged

pubsub: reject expired and duplicate messages #3743

merged 4 commits into from
Oct 18, 2018

Conversation

pongad
Copy link
Contributor

@pongad pongad commented Sep 27, 2018

CC @csainty

This should let us clear through backlogs more quickly.

@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Sep 27, 2018
if (ackHandler.totalExpiration.isBefore(now())) {
// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


// forget removes from pendingMessages; this is OK, concurrent maps can
// handle concurrent iterations and modifications.
entry.getValue().forget();

This comment was marked as spam.

This comment was marked as spam.

@csainty
Copy link

csainty commented Sep 27, 2018

Generally like it, nice clean approach 👍

Won't handle a 0 extension period though will it? Since totalExpiration will be set to receive time

final Subscriber subscriber = Subscriber.newBuilder(subscriptionName, (msg, ack) -> {
    // do something
  })
  .setMaxAckExtensionPeriod(Duration.ZERO)
  .build();

@pongad
Copy link
Contributor Author

pongad commented Sep 27, 2018

That's right. This won't handle the case where max ack extension is zero. I'm a little concerned by this; this essentially changes the behavior from "we won't extend these" to "we'll never work on these". I'll make a small fix for this.

@csainty
Copy link

csainty commented Sep 28, 2018

👍
I pulled this morning and ran my sample project with the build, hits exactly-once consumption. Thanks for looking in to it.

I think this will fix #3383 and #2465
Also make it easier to configure for https://github.com/GoogleCloudPlatform/google-cloud-java/issues/3152 by tweaking the max deadline so all the messages don't pull to a single subscriber and stay there, as currently reducing the max deadline to prevent this gets you in to double processing sadness.

@pongad
Copy link
Contributor Author

pongad commented Sep 28, 2018

Don't merge this yet. Someone from pubsub should take a look.

@@ -398,6 +423,14 @@ public void nack() {
@Override
public void run() {
try {
if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) {

This comment was marked as spam.

This comment was marked as spam.

@pongad pongad requested a review from a team as a code owner October 18, 2018 00:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants