Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting duplicate messages on consumer #772

Closed
vikrampunchh opened this issue Oct 29, 2021 · 9 comments
Closed

getting duplicate messages on consumer #772

vikrampunchh opened this issue Oct 29, 2021 · 9 comments
Assignees
Labels

Comments

@vikrampunchh
Copy link

I have started 4 consumers with this config, sometimes the same message receives by multiple consumers.

I have added GroupID to prevent duplicates

consumer := kafka.NewReader(kafka.ReaderConfig{
		Brokers:         kafkaBrokers,
		Topic:             "my-topic",
		StartOffset:     kafka.FirstOffset,
		MinBytes:        1,
		MaxBytes:        100e6,
		GroupID:          "my-group",
		MaxWait:         time.Hour * 24,
		ReadLagInterval: 1 * time.Second,
		Dialer:          dialer,
		QueueCapacity:   queueCapacity * 2,
	})
@rhansen2
Copy link
Collaborator

Hi @vikrampunchh without GroupID a consumer group is not used so starting 4 readers with the same config should expect to see duplicates between readers.

@vikrampunchh
Copy link
Author

vikrampunchh commented Oct 31, 2021

@rhansen2 thanks, but GroupID is used for preventing duplicate messages.

Configs:

  • I have started 4 consumers (workers if one gets down another will continue.)
  • 4 consumers have the same GroupID
  • Number of partitions 1

according to Kafka docs, one partition can send messages to one consumer but I am getting the same offsets message in two consumers (randomly).

@rhansen2
Copy link
Collaborator

Sorry @vikrampunchh I misunderstood your original message.

Could you provide a code sample I could use to reproduce the issue?

@vikrampunchh
Copy link
Author


import (
	"context"
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	topics := strings.Split("topic_one,topic_two,topic_three,topic_four,topic_five", ",")
	var wg sync.WaitGroup
	for _, v := range topics {
		wg.Add(1)
		go startConsumer(v, &wg)
	}
	wg.Wait()
}

func startConsumer(topic string, swg *sync.WaitGroup) {
	defer swg.Done()
	kafkaBrokers := []string{"127.0.0.1:9092"}
	group := "my-group"

	dialer := &kafka.Dialer{
		Timeout:   time.Second * 10,
		DualStack: true,
	}

	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:         kafkaBrokers,
		Topic:           topic,
		StartOffset:     kafka.FirstOffset,
		MinBytes:        1,
		MaxBytes:        100e6,
		GroupID:         group,
		MaxWait:         time.Hour * 24,
		ReadLagInterval: 1 * time.Second,
		Dialer:          dialer,
		QueueCapacity:   100 * 2,
	})

	fmt.Printf("startConsumer -------- \n")

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		messages := []kafka.Message{}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s \n", m.Topic, m.Partition, m.Offset, string(m.Key))
		messages = append(messages, m)
		r.CommitMessages(context.Background(), messages...)
		fmt.Println()
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

I am running this code on 4 workers
same offsets come sometimes (randomly)

@rhansen2
Copy link
Collaborator

rhansen2 commented Nov 2, 2021

Thanks @vikrampunchh! What does production to the topics look like, is it constant or sporadic? Do you see the duplicates for all topics or does it tend to occur with a single topic?

Additionally, are you sing any errors from CommitMessages when you see the duplicates?
Are there any rebalances happening around the times when you see the duplicates?

@vikrampunchh
Copy link
Author

it occurs randomly on all topics.

No i am not getting any error on CommitMessages

@rhansen2
Copy link
Collaborator

Hi @vikrampunchh So far I haven't had any success replicating this issue. Our current theory is that potentially rebalances are occurring between ReadMessage and CommitMessages. I would expect to see an error from CommitMessages but the issue could be that error is not propagating. In this case we would expect to see duplicates between consumers.

Are you seeing any rebalances around the times you're seeing the duplicates?

You could also get more detailed information out of the Reader by setting the Logger field of the ReaderConfig that might give more hints as to what's occurring.

Thanks!

@rhansen2
Copy link
Collaborator

Hi @vikrampunchh We recently published v0.4.27 which might help with your issue.

@rhansen2
Copy link
Collaborator

rhansen2 commented Feb 4, 2022

Closing due to inactivity.

@rhansen2 rhansen2 closed this as completed Feb 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants