Skip to content

Commit

Permalink
add test for final commit on generation end
Browse files Browse the repository at this point in the history
  • Loading branch information
rhansen2 committed Jan 17, 2022
1 parent bef8d34 commit 69cda82
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
8 changes: 3 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ type ReaderConfig struct {

// Validate method validates ReaderConfig properties.
func (config *ReaderConfig) Validate() error {

if len(config.Brokers) == 0 {
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
}
Expand Down Expand Up @@ -873,7 +872,7 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
}

var errch <-chan error
var creq = commitRequest{
creq := commitRequest{
commits: makeCommits(msgs...),
}

Expand Down Expand Up @@ -1342,7 +1341,6 @@ func (r *reader) run(ctx context.Context, offset int64) {

case OffsetOutOfRange:
first, last, err := r.readOffsets(conn)

if err != nil {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
Expand Down Expand Up @@ -1402,7 +1400,7 @@ func (r *reader) run(ctx context.Context, offset int64) {

func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
for i := 0; i != len(r.brokers) && conn == nil; i++ {
var broker = r.brokers[i]
broker := r.brokers[i]
var first, last int64

t0 := time.Now()
Expand Down Expand Up @@ -1551,7 +1549,7 @@ func (r *reader) withErrorLogger(do func(Logger)) {
// extractTopics returns the unique list of topics represented by the set of
// provided members
func extractTopics(members []GroupMember) []string {
var visited = map[string]struct{}{}
visited := map[string]struct{}{}
var topics []string

for _, member := range members {
Expand Down
50 changes: 48 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func makeTestSequence(n int) []Message {
}

func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) {
var config = r.Config()
config := r.Config()
var conn *Conn
var err error

Expand Down Expand Up @@ -637,7 +637,6 @@ func TestReaderPartitionWhenConsumerGroupsEnabled(t *testing.T) {
if !invoke() {
t.Fatalf("expected panic; but NewReader worked?!")
}

}

func TestExtractTopics(t *testing.T) {
Expand Down Expand Up @@ -1208,6 +1207,53 @@ func TestValidateReader(t *testing.T) {
}
}

func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) {
t.Parallel()
var committedOffset int64
var commitCount int
gen := &Generation{
conn: mockCoordinator{
offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) {
commitCount++
committedOffset = r.Topics[0].Partitions[0].Offset
return offsetCommitResponseV2{}, nil
},
},
done: make(chan struct{}),
log: func(func(Logger)) {},
logError: func(func(Logger)) {},
}

// initialize commits so that the commitLoopImmediate select statement blocks
r := &Reader{stctx: context.Background(), commits: make(chan commitRequest, 100)}

for i := 0; i < 100; i++ {
cr := commitRequest{
commits: []commit{{
topic: "topic",
partition: 0,
offset: int64(i) + 1,
}},
errch: make(chan<- error, 1),
}
r.commits <- cr
}

gen.Start(func(ctx context.Context) {
r.commitLoopImmediate(ctx, gen)
})

gen.close()

if committedOffset != 100 {
t.Fatalf("expected commited offset to be 100 but got %d", committedOffset)
}

if commitCount >= 100 {
t.Fatalf("expected a single final commit on generation end got %d", commitCount)
}
}

func TestCommitOffsetsWithRetry(t *testing.T) {
offsets := offsetStash{"topic": {0: 0}}

Expand Down

0 comments on commit 69cda82

Please sign in to comment.