diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index adf0cd7b332f..17089efbbf63 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -611,6 +611,10 @@ func (i *Ingester) starting(ctx context.Context) (err error) { i.setPrepareShutdown() } + // start our flush loop: this needs to start before the partition-reader in order for chunks to be shipped in the case of Kafka catching up. + i.loopDone.Add(1) + go i.loop() + // When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition // BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start // it will switch the ingester state in the ring to ACTIVE. @@ -646,9 +650,7 @@ func (i *Ingester) starting(ctx context.Context) (err error) { return fmt.Errorf("failed to start partition ring lifecycler: %w", err) } } - // start our loop - i.loopDone.Add(1) - go i.loop() + return nil }