Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer fetching once a second #2879

Closed
4 of 7 tasks
sachanr opened this issue May 9, 2020 · 5 comments
Closed
4 of 7 tasks

Consumer fetching once a second #2879

sachanr opened this issue May 9, 2020 · 5 comments
Milestone

Comments

@sachanr
Copy link

sachanr commented May 9, 2020

Description

Currently we have a low value for queued.max.messages.kbytes for our consumers. It is set to 25. When we were on version v0.9.5 (without change: 8321e37), our consumer had high message rates for consumption. After moving to v1.3.0 (and with this backoff change), we see that the that when the toppar reaches queued.max.messages.kbytes, it applies an immediate backoff of 1000ms. This causes the consumer to slow down to fetch from the kafka broker only once (25KB) and then wait until the next second. In the previous case, v0.9.5, it would do the opposite, fetch more aggressively to keep the queue full, more frequently than 100ms (appears so from the logs, have not checked the code for this). (100ms, being the standard fetch timeout). We have fetch.error.backoff.ms set to 0, but that does not help in this scenario. The only way out of this scenario is to increase queued.max.messages.kbytes to a value that the app can consume in 1 second to make enough data available for the app until kafka consumer goes back the next second or to remove the property all together. Having a larger queued.max.messages.kbytes would increase the memory footprint of the app but in this case where throughput is more important, it works alright for us.

I was wondering if you had any suggestions around this please? Ideally shouldn't setting fetch.error.backoff.ms to 0 work?

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): v1.3.0
  • Apache Kafka version: 0.10.0.0_2
  • librdkafka client configuration:
    [fetch.error.backoff.ms] : [0]
    [enable.partition.eof] : [true]
    [queued.max.messages.kbytes] : [25]
    [fetch.message.max.bytes] : [25600]
    [debug] : [all]
    [statistics.interval.ms] : [1000]
    [queue.buffering.max.ms] : [1]
    [socket.timeout.ms] : [300000]
  • Operating system: RHEL 7.2
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

The general recommendation is to set queued.max.messages.kbytes large enough to buffer at least a second of data, preferably more, to even out message rate passed to the application in case there are network congestion, bursty producers, etc, e.g., there should always be messages for the application to be read.

@shanson7
Copy link
Contributor

shanson7 commented Mar 17, 2021

This has recently started effecting us as well. Here's an excerpt from https://gitter.im/edenhill/librdkafka

Sean Hanson @shanson7 09:53
While debugging lagging (but fairly idle) consumers we found the existing issue #2879 and think this might be the issue (combined with the reduction of default queued.max.messages.kbytes to 64MB in 1.5.0). It seems to be a compounding problem. e.g. if the queue fills, then there is a long pause where no data is consumed (1 second) and then the queue is immediately filled again with the accumulated data. This means that queued.max.messages.kbytes needs to be generously allocated to have more than 1 second of room. In fact, it likely needs quite a lot more to avoid getting caught in this feedback loop. Does this sound like a correct analysis?

Magnus Edenhill @edenhill 10:51
That sounds like a correct analysis. In the bestest of worlds we would re-trigger the fetcher automatically when the queue size drops below the max.messages.. threshold, but this is CPU costly and not straight forward with the default forwarded-queue approach. A workaround would be for a shorter (or perhaps configurable?) backoff time. What do you think?

Sean Hanson @shanson7 12:04
That's exactly what I was thinking. Configurable backoff would probably be perfect. Then you could basically control CPU/Memory trade off (short backoff increases CPU, large max queue size increases memory)

@shalstea
Copy link
Contributor

We would be very interested in a configurable backoff feature. If you are doing that work under this ticket then I will simply watch this one. If doing it under a different ticket please provide a link to it. Hopefully it can get prioritized. Thanks.

@edenhill edenhill added this to the v1.8.0 milestone Apr 16, 2021
@edenhill edenhill modified the milestones: v1.8.0, v1.9.0 Sep 17, 2021
@edenhill
Copy link
Contributor

Ive added fetch.queue.backoff.ms to control this delay. Please try it out on the queueboff branch.

bitemyapp pushed a commit to bitemyapp/librdkafka that referenced this issue May 9, 2023
bitemyapp added a commit to bitemyapp/librdkafka that referenced this issue May 9, 2023
emasab added a commit that referenced this issue Jun 15, 2023
…patch (#4284)

this property allows to trade off cpu for memory by reducing fetch backoff,
when values of `queued.max.messages.kbytes` and `queued.min.messages`
have to be set too high to hold 1s of data.

---------

Co-authored-by: Chris A. <cma@bitemyapp.com>
@bitemyapp
Copy link
Contributor

@sachanr Thanks for filing this issue, thanks to your post and @edenhill's patch I was able to update the patch and it has since been merged.

Cf.

tl;dr:

  • Increase in per consumer maximum throughput of 2x
  • Consumer groups linearly scale in the presence of partitions w/ skewed lag. Was able to reduce my deployment by 75% (32 -> 8 instances). This made the per consumer throughput scale by 900 MiB/second (decompressed data rate) exactly from 3 instances to 64. I'm running it at 200 ms for fetch.queue.backoff.ms with default settings otherwise for my Kafka Consumer and it's doing great.

Hope this helps!

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants