diff --git a/reader.go b/reader.go index 439893f9..e278f3d5 100644 --- a/reader.go +++ b/reader.go @@ -528,7 +528,6 @@ type ReaderConfig struct { // Validate method validates ReaderConfig properties. func (config *ReaderConfig) Validate() error { - if len(config.Brokers) == 0 { return errors.New("cannot create a new kafka reader with an empty list of broker addresses") } @@ -873,7 +872,7 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { } var errch <-chan error - var creq = commitRequest{ + creq := commitRequest{ commits: makeCommits(msgs...), } @@ -1342,7 +1341,6 @@ func (r *reader) run(ctx context.Context, offset int64) { case OffsetOutOfRange: first, last, err := r.readOffsets(conn) - if err != nil { r.withErrorLogger(func(log Logger) { log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err) @@ -1402,7 +1400,7 @@ func (r *reader) run(ctx context.Context, offset int64) { func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) { for i := 0; i != len(r.brokers) && conn == nil; i++ { - var broker = r.brokers[i] + broker := r.brokers[i] var first, last int64 t0 := time.Now() @@ -1551,7 +1549,7 @@ func (r *reader) withErrorLogger(do func(Logger)) { // extractTopics returns the unique list of topics represented by the set of // provided members func extractTopics(members []GroupMember) []string { - var visited = map[string]struct{}{} + visited := map[string]struct{}{} var topics []string for _, member := range members { diff --git a/reader_test.go b/reader_test.go index 0a807276..d523d73e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -436,7 +436,7 @@ func makeTestSequence(n int) []Message { } func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) { - var config = r.Config() + config := r.Config() var conn *Conn var err error @@ -637,7 +637,6 @@ func TestReaderPartitionWhenConsumerGroupsEnabled(t *testing.T) { if !invoke() { t.Fatalf("expected panic; but NewReader worked?!") } - } func TestExtractTopics(t *testing.T) { @@ -1208,6 +1207,53 @@ func TestValidateReader(t *testing.T) { } } +func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { + t.Parallel() + var committedOffset int64 + var commitCount int + gen := &Generation{ + conn: mockCoordinator{ + offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { + commitCount++ + committedOffset = r.Topics[0].Partitions[0].Offset + return offsetCommitResponseV2{}, nil + }, + }, + done: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + } + + // initialize commits so that the commitLoopImmediate select statement blocks + r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)} + + for i := 0; i < 100; i++ { + cr := commitRequest{ + commits: []commit{{ + topic: "topic", + partition: 0, + offset: int64(i) + 1, + }}, + errch: make(chan<- error, 1), + } + r.commits <- cr + } + + gen.Start(func(ctx context.Context) { + r.commitLoopImmediate(ctx, gen) + }) + + gen.close() + + if committedOffset != 100 { + t.Fatalf("expected commited offset to be 100 but got %d", committedOffset) + } + + if commitCount >= 100 { + t.Fatalf("expected a single final commit on generation end got %d", commitCount) + } +} + func TestCommitOffsetsWithRetry(t *testing.T) { offsets := offsetStash{"topic": {0: 0}}