Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: starting offsets - do they work with consumer groups? #711

Closed
alexec opened this issue Jul 18, 2021 · 7 comments · Fixed by #715
Closed

Question: starting offsets - do they work with consumer groups? #711

alexec opened this issue Jul 18, 2021 · 7 comments · Fixed by #715
Assignees

Comments

@alexec
Copy link

alexec commented Jul 18, 2021

One of my automated FMEA test failed. The test simulates a deletion of a pod and verifies that it re-connects to Kafka and correctly continues to process messages. It lost around 50% of the total messages - this is, not to mince my words, catastrophic.

The diagnostics hinted to me that when the consumer group re-connected, it started at the latest offset.

I recently (maybe coincidentally, maybe causally) added a configuration items to the ReaderConfig as follows:

kafka.ReaderConfig{
		Brokers:     x.Brokers,
		Dialer:      dialer,
		GroupID:     groupName,
		Topic:       x.Topic,
		StartOffset: startOffset,
	}

My expectation would be that, on first connect it'd use the StartOffset, but on re-connect, the consumer group would just continue from the last committed offset.

Am I used this incorrectly?

@alexec alexec changed the title Questions: starting offsets and consume group Question: starting offsets - do they work with consumer groups? Jul 18, 2021
@stevevls
Copy link
Contributor

Hey @alexec . I suspect that what's happening here is that the pod that you've deleted hasn't gotten a chance to commit any offset before crashing, so the next pod will pick up from StartOffset. I'd need a little more information to be sure of that, though.

  1. How are you reading messages? Are you using ReadMessage or FetchMessage?
  2. What is the value of the startOffset variable? My guess is that you're using LastOffset based on the behavior you've described. 😄

@alexec
Copy link
Author

alexec commented Jul 19, 2021

  1. ReadMessage. I tried FetchMessages but I seemed to loose messages.
  2. LastOffset

Note - the pod does not crash, it is terminated and gracefully shuts down, calling reader.Close().

@stevevls
Copy link
Contributor

Aha! If you're using FetchMessage, then the commit logic is not invoked, so I believe you're running into the scenario I mentioned.

kafka-go/reader.go

Lines 714 to 716 in a3150d8

// FetchMessage does not commit offsets automatically when using consumer groups.
// Use CommitMessages to commit the offset.
func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {

I would suggest either adding a call to CommitMessages or to use ReadMessage, which is like calling FetchMessage followed by CommitMessages.

@alexec
Copy link
Author

alexec commented Jul 19, 2021

I'm not using FetchMessage, it did not work for me. I am using ReadMessage.

Do I need to do some explicit action to get it to commit the offsets on exit?

@stevevls
Copy link
Contributor

Sorry about that...I just re-read the question and saw that you had stated that you are using ReadMessage. 🤦

I just traced the code, and it looks like there is no guarantee that commits are flushed when the reader is closed. The commitLoop function is where all the committing happens (even when using synchronous commits), and that function is bound to the lifecycle of the consumer group generation:

kafka-go/reader.go

Lines 272 to 274 in a3150d8

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})

The consumer group is closed by Reader.Close, so I believe that could race with that go routine committing offsets and cause it to return early if called very soon after FetchMessage. And If that were to happen, you would get the behavior you're seeing. Does that sound like a possibility with your test setup?

@alexec
Copy link
Author

alexec commented Jul 19, 2021

I could be possible. Is a manual commit on close possible?

@stevevls
Copy link
Contributor

Yup. At Segment we tend to be processing high volume topics, so we practically never use the immediate commit functionality (it introduces a ton of chatter), so it doesn't surprise me that it's not as battle tested as the interval commit loop. 😅

I dug a little more, and I see that we do a final commit on Close when committing on an interval.

So all we need to do is add a final commit for the synchronous case. I opened #715, but I don't want to commit it until I get some test coverage on it. You're more than welcome to take the code for a spin to see if it fixes your issue, though. 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants