Skip to content

Commit

Permalink
Do a final commit on end consumer group generation for immediate commits
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve van Loben Sels committed Jul 21, 2021
1 parent d11b5df commit bef8d34
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
for {
select {
case <-ctx.Done():
// drain the commit channel and prepare a single, final commit.
// the commit will combine any outstanding requests and the result
// will be sent back to all the callers of CommitMessages so that
// they can return.
var errchs []chan<- error
for hasCommits := true; hasCommits; {
select {
case req := <-r.commits:
offsets.merge(req.commits)
errchs = append(errchs, req.errch)
default:
hasCommits = false
}
}
err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
for _, errch := range errchs {
// NOTE : this will be a buffered channel and will not block.
errch <- err
}
return

case req := <-r.commits:
Expand Down

0 comments on commit bef8d34

Please sign in to comment.