diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 20e2bf7ebc9..d8f4288b011 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -67,13 +67,14 @@ func newEventConsumer( queue queue.Queue, ctx *batchContext, ) *eventConsumer { + consumer := queue.Consumer() c := &eventConsumer{ logger: log, sig: make(chan consumerSignal, 3), out: nil, queue: queue, - consumer: queue.Consumer(), + consumer: consumer, ctx: ctx, } @@ -82,7 +83,7 @@ func newEventConsumer( c.wg.Add(1) go func() { defer c.wg.Done() - c.loop(c.consumer) + c.loop(consumer) }() return c } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index f150a909320..da1bac4f5d0 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -48,7 +48,7 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 - numEvents := atomic.MakeUint(0) + var numEvents uint logger := makeBufLogger(t) @@ -69,7 +69,7 @@ func TestMakeClientWorker(t *testing.T) { for i := uint(0); i < numBatches; i++ { batch := randomBatch(50, 150).withRetryer(retryer) - numEvents.Add(uint(len(batch.Events()))) + numEvents += uint(len(batch.Events())) wqu <- batch } @@ -78,7 +78,7 @@ func TestMakeClientWorker(t *testing.T) { // Make sure that all events have eventually been published success := waitUntilTrue(timeout, func() bool { - return numEvents == published + return numEvents == published.Load() }) if !success { logger.Flush() @@ -202,7 +202,7 @@ func TestMakeClientTracer(t *testing.T) { testutil.SeedPRNG(t) numBatches := 10 - numEvents := atomic.MakeUint(0) + var numEvents uint logger := makeBufLogger(t) @@ -226,7 +226,7 @@ func TestMakeClientTracer(t *testing.T) { for i := 0; i < numBatches; i++ { batch := randomBatch(10, 15).withRetryer(retryer) - numEvents.Add(uint(len(batch.Events()))) + numEvents += uint(len(batch.Events())) wqu <- batch } @@ -235,7 +235,7 @@ func TestMakeClientTracer(t *testing.T) { // Make sure that all events have eventually been published matches := waitUntilTrue(timeout, func() bool { - return numEvents == published + return numEvents == published.Load() }) if !matches { t.Errorf("expected %d events, got %d", numEvents, published)