Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub StreamingPull receives many duplicates when there is a backlog #3383

Closed
yonran opened this issue Jun 14, 2018 · 4 comments
Closed

Pub/Sub StreamingPull receives many duplicates when there is a backlog #3383

yonran opened this issue Jun 14, 2018 · 4 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. status: blocked Resolving the issue is dependent on other work. type: question Request for information or clarification. Not an issue.

Comments

@yonran
Copy link

yonran commented Jun 14, 2018

The Pub/Sub StreamingPull API gives many duplicates when the messages are small and there is a backlog. This is a difference from Pull, which does not exhibit this behavior. After a while, more than 50% of messages can be duplicates. This makes it very hard to process a backlog. @kir-titievsky suggested that I create a new issue when I described it in #2465.

I created two test programs to replicate this issue:

  1. CloudPubSub.java uses the Asynchronous Pull (MessageReceiver) Java API to receive messages and write to a file of JSONs. Optionally, you can also use my custom google-cloud-java branch in which I instrumented MessageDispatcher.java to log the message IDs that are acked.
    • MyProducer.java publishes a backlog of messages (e.g. --initial-publish-messages=10000) and then publishes a stream of messages (default --publish-period=400 means 2.5 messages/second)
    • LogMessagesReceiver.java sleeps a fixed duration per message (default --message-processing-time=5000ms), then throttles acking to --period=333 which means 3 messages/second, and then acks the message. Note that it should make progress since its --period is less than the publisher’s --publish-period, but it doesn’t because of the duplicate messages.
    • CloudPubSub.java has the FlowControlSettings. By default, --concurrent-messages=20 means that 20 receivers sleep in parallel. Since 5000 < 333*20 = 6660, there are enough concurrent threads that a 5000ms sleep does not reduce the subscriber throughput below the desired 3 messages per second.
    • Result: After an hour or two, there are many duplicate message ids, as indicated by running jq < /tmp/inv-log-cloudpubsub-pub2.5-sub3.jsons --slurp '[.[] | .messageId] | {unique: sort|unique|length, total: length} | .duplicates = .total - .unique'
    • Stackdriver metrics show that the backlog is growing even through the consumer (3 messages/second) is faster than the producer (2.5 messages/second).
      undelivered and oldest acknowledged
  2. RawPubSub.java uses the same low-level GRPC StreamingPull API as the high-level MessageReceiver API does. Like CloudPubSub.java, it logs the message ids to stdout and to a file of JSONs.
    • In the publisher thread, MyProducer.java publishes a backlog of messages (e.g. --initial-publish-messages=10000) and then publishes a stream of messages (default --publish-period=400 means 2.5 messages/second)
    • onError is implemented on the ack StreamObserver so that we can detect acks that failed. I have not seen any failures.
    • The receiver thread calls request(1) to get one message at a time, queues them up, and processes them with similar timing as CloudPubSub.java (i.e., waits 5 seconds per message and then throttles acks to 3 messages per second). It also calls modack to extend deadlines.
    • Result: After an hour or two, there are many duplicate message ids, as indicated by running jq < /tmp/inv-log-grpc-pub2.5-sub3.jsons --slurp '[.[] | .messageId] | {unique: sort|unique|length, total: length} | .duplicates = .total - .unique'
    • Message IDs are logged to stdout at the time they are received and at the time they are acked. By checking the log, we can see that the client acks messages within the subscription’s ackDeadlineSeconds (10s default), and the server sent duplicates regardless.
  3. The pubsub-0.21.1-beta branch of CloudPubSub.java uses the same MessageReceiver Java API, but at version 0.21.1-beta which still used Pull instead of StreamingPull. It does not give a significant number of duplicates.

I opened a support ticket for this, Case 15877623, 2018-05-21: Pub/Sub subscriber receives duplicated MessageIds and never catches up. On 2018-05-29, the representative said this is a known issue with StreamingPull, but there is no ETA for fixing it and that I should poll the PubSub release notes for updates.

@JustinBeckwith JustinBeckwith added the triage me I really want to be triaged. label Jun 14, 2018
@yihanzhen yihanzhen added type: question Request for information or clarification. Not an issue. api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. and removed triage me I really want to be triaged. labels Jun 15, 2018
@yihanzhen yihanzhen added the status: blocked Resolving the issue is dependent on other work. label Jun 27, 2018
@kir-titievsky
Copy link

@yonran thank you for the excellent bug and your patience. We have a theory: by default, the thread pool used to for message processing has 5 threads, while you are trying to process 20 messages at the same time. This might mean you never have enough threads to actually do the processing. You might try two different thing:
A: --concurrent-messages=5 to match the number of existing threads
B: Provide your own executor so you don't block the threads in the client library's pool. You can do this in two ways: provide your own executorservice to the client library with 20 threads by default (or something that auto scales). Or you can try to create your own dedicated thread pool for doing the processing ("sleeps"), instead of blocking the client library threads. The client library would just spin up new threads to do the processing.

Does that make sense? Would you be so kind as to at least try A to test this hypothesis?

@pongad
Copy link
Contributor

pongad commented Jul 24, 2018

@kir-titievsky I might be missing something, but it seems like the fix for this is just to bring back the "polling pull" right?

@kir-titievsky
Copy link

kir-titievsky commented Jul 26, 2018

@pongad: going back to the older version of the client library would be a quicker path to immediate success. I would prefer we figured out how to solve this in the newer client, if there is a solution, since it is more maintainable and presumably more performant. From my previous comment, it seems there is a reason to believe we may solve this through configuration changes.

@chingor13
Copy link
Contributor

This should have been fixed in #3743. Feel free to reopen if this is not the case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. status: blocked Resolving the issue is dependent on other work. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

6 participants