From 5c2863902fb06cd58283920d6851e0a375ef44fd Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 15 Jul 2020 13:44:51 +0800 Subject: [PATCH] libbeat/publisher/pipeline: fix data races (#19821) (#19865) Fix how we pass the initial queue consumer into eventConsumer.loop; we were referencing c.consumer in a background goroutine, which can race with updates to the consumer. Update tests to properly load atomic variables. Changed serially updated numEvents vars to basic, non-atomic types. (cherry picked from commit ebacd3bd21d8f75ae3a278f479425bc81c4f8831) --- libbeat/publisher/pipeline/consumer.go | 5 +++-- libbeat/publisher/pipeline/output_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 20e2bf7ebc9..d8f4288b011 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -67,13 +67,14 @@ func newEventConsumer( queue queue.Queue, ctx *batchContext, ) *eventConsumer { + consumer := queue.Consumer() c := &eventConsumer{ logger: log, sig: make(chan consumerSignal, 3), out: nil, queue: queue, - consumer: queue.Consumer(), + consumer: consumer, ctx: ctx, } @@ -82,7 +83,7 @@ func newEventConsumer( c.wg.Add(1) go func() { defer c.wg.Done() - c.loop(c.consumer) + c.loop(consumer) }() return c } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index f150a909320..da1bac4f5d0 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -48,7 +48,7 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 - numEvents := atomic.MakeUint(0) + var numEvents uint logger := makeBufLogger(t) @@ -69,7 +69,7 @@ func TestMakeClientWorker(t *testing.T) { for i := uint(0); i < numBatches; i++ { batch := randomBatch(50, 150).withRetryer(retryer) - numEvents.Add(uint(len(batch.Events()))) + numEvents += uint(len(batch.Events())) wqu <- batch } @@ -78,7 +78,7 @@ func TestMakeClientWorker(t *testing.T) { // Make sure that all events have eventually been published success := waitUntilTrue(timeout, func() bool { - return numEvents == published + return numEvents == published.Load() }) if !success { logger.Flush() @@ -202,7 +202,7 @@ func TestMakeClientTracer(t *testing.T) { testutil.SeedPRNG(t) numBatches := 10 - numEvents := atomic.MakeUint(0) + var numEvents uint logger := makeBufLogger(t) @@ -226,7 +226,7 @@ func TestMakeClientTracer(t *testing.T) { for i := 0; i < numBatches; i++ { batch := randomBatch(10, 15).withRetryer(retryer) - numEvents.Add(uint(len(batch.Events()))) + numEvents += uint(len(batch.Events())) wqu <- batch } @@ -235,7 +235,7 @@ func TestMakeClientTracer(t *testing.T) { // Make sure that all events have eventually been published matches := waitUntilTrue(timeout, func() bool { - return numEvents == published + return numEvents == published.Load() }) if !matches { t.Errorf("expected %d events, got %d", numEvents, published)