From 382e96d3016e702bce4f51427f8fc7fc62f47b82 Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Fri, 21 Jan 2022 10:47:39 -0800 Subject: [PATCH] Do a final commit on end consumer group generation for immediate commits (#715) * Do a final commit on end consumer group generation for immediate commits * add test for final commit on generation end Co-authored-by: rhansen2 --- reader.go | 19 +++++++++++++++++++ reader_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/reader.go b/reader.go index e9d34a5d..61a0afcc 100644 --- a/reader.go +++ b/reader.go @@ -212,6 +212,25 @@ func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) { for { select { case <-ctx.Done(): + // drain the commit channel and prepare a single, final commit. + // the commit will combine any outstanding requests and the result + // will be sent back to all the callers of CommitMessages so that + // they can return. + var errchs []chan<- error + for hasCommits := true; hasCommits; { + select { + case req := <-r.commits: + offsets.merge(req.commits) + errchs = append(errchs, req.errch) + default: + hasCommits = false + } + } + err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) + for _, errch := range errchs { + // NOTE : this will be a buffered channel and will not block. + errch <- err + } return case req := <-r.commits: diff --git a/reader_test.go b/reader_test.go index d932a28a..4f819151 100644 --- a/reader_test.go +++ b/reader_test.go @@ -495,7 +495,7 @@ func makeTestSequence(n int) []Message { } func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) { - var config = r.Config() + config := r.Config() var conn *Conn var err error @@ -710,7 +710,6 @@ func TestReaderPartitionWhenConsumerGroupsEnabled(t *testing.T) { if !invoke() { t.Fatalf("expected panic; but NewReader worked?!") } - } func TestExtractTopics(t *testing.T) { @@ -1281,6 +1280,53 @@ func TestValidateReader(t *testing.T) { } } +func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { + t.Parallel() + var committedOffset int64 + var commitCount int + gen := &Generation{ + conn: mockCoordinator{ + offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { + commitCount++ + committedOffset = r.Topics[0].Partitions[0].Offset + return offsetCommitResponseV2{}, nil + }, + }, + done: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + } + + // initialize commits so that the commitLoopImmediate select statement blocks + r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)} + + for i := 0; i < 100; i++ { + cr := commitRequest{ + commits: []commit{{ + topic: "topic", + partition: 0, + offset: int64(i) + 1, + }}, + errch: make(chan<- error, 1), + } + r.commits <- cr + } + + gen.Start(func(ctx context.Context) { + r.commitLoopImmediate(ctx, gen) + }) + + gen.close() + + if committedOffset != 100 { + t.Fatalf("expected commited offset to be 100 but got %d", committedOffset) + } + + if commitCount >= 100 { + t.Fatalf("expected a single final commit on generation end got %d", commitCount) + } +} + func TestCommitOffsetsWithRetry(t *testing.T) { offsets := offsetStash{"topic": {0: 0}}