diff --git a/pkg/analytics/reporter.go b/pkg/analytics/reporter.go index 7daa352259f2..47312e6374cd 100644 --- a/pkg/analytics/reporter.go +++ b/pkg/analytics/reporter.go @@ -95,28 +95,31 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { MaxRetries: 0, }) for backoff.Ongoing() { - // create a new cluster seed - seed := ClusterSeed{ - UID: uuid.NewString(), - PrometheusVersion: build.GetVersion(), - CreatedAt: time.Now(), - } - if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { - // The key is already set, so we don't need to do anything - if in != nil { - if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID { - seed = *kvSeed - return nil, false, nil + { + // create a new cluster seed + seed := ClusterSeed{ + UID: uuid.NewString(), + PrometheusVersion: build.GetVersion(), + CreatedAt: time.Now(), + } + if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { + // The key is already set, so we don't need to do anything + if in != nil { + if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID { + seed = *kvSeed + return nil, false, nil + } } + return &seed, true, nil + }); err != nil { + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue } - return &seed, true, nil - }); err != nil { - level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) - continue } // ensure stability of the cluster seed stableSeed := ensureStableKey(ctx, kvClient, rep.logger) - seed = *stableSeed + // This is a new local variable so that Go knows it's not racing with the previous usage. + seed := *stableSeed // Fetch the remote cluster seed. remoteSeed, err := rep.fetchSeed(ctx, func(err error) bool { @@ -262,7 +265,7 @@ func (rep *Reporter) running(ctx context.Context) error { } return nil } - rep.startCPUPercentCollection(ctx) + rep.startCPUPercentCollection(ctx, time.Minute) // check every minute if we should report. ticker := time.NewTicker(reportCheckInterval) defer ticker.Stop() @@ -317,13 +320,13 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error return errs.Err() } +const cpuUsageKey = "cpu_usage" + var ( - cpuUsageKey = "cpu_usage" - cpuUsage = NewFloat(cpuUsageKey) - cpuCollectionInterval = time.Minute + cpuUsage = NewFloat(cpuUsageKey) ) -func (rep *Reporter) startCPUPercentCollection(ctx context.Context) { +func (rep *Reporter) startCPUPercentCollection(ctx context.Context, cpuCollectionInterval time.Duration) { proc, err := process.NewProcess(int32(os.Getpid())) if err != nil { level.Debug(rep.logger).Log("msg", "failed to get process", "err", err) diff --git a/pkg/analytics/reporter_test.go b/pkg/analytics/reporter_test.go index 140953e70700..889ec8d31e19 100644 --- a/pkg/analytics/reporter_test.go +++ b/pkg/analytics/reporter_test.go @@ -159,14 +159,13 @@ func TestWrongKV(t *testing.T) { } func TestStartCPUCollection(t *testing.T) { - cpuCollectionInterval = 1 * time.Second r, err := NewReporter(Config{Leader: true, Enabled: true}, kv.Config{ Store: "inmemory", }, nil, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r.startCPUPercentCollection(ctx) + r.startCPUPercentCollection(ctx, 1*time.Second) require.Eventually(t, func() bool { return cpuUsage.Value() > 0 }, 5*time.Second, 1*time.Second) diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 1c5a4bc1c45b..149e43f3234d 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -87,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - return server.completedTasks == len(tasks) + return int(server.completedTasks.Load()) == len(tasks) }, 5*time.Second, 100*time.Millisecond) err = services.StopAndAwaitTerminated(context.Background(), builder) @@ -98,7 +99,7 @@ func Test_BuilderLoop(t *testing.T) { type fakePlannerServer struct { tasks []*protos.ProtoTask - completedTasks int + completedTasks atomic.Int64 shutdownCalled bool addr string @@ -148,7 +149,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive task response: %w", err) } - f.completedTasks++ + f.completedTasks.Add(1) } // No more tasks. Wait until shutdown. diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index ea2ea5db531b..f4a0080cb3d4 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -701,7 +701,7 @@ func (p *Planner) totalPendingTasks() (total int) { func (p *Planner) enqueueTask(task *QueueTask) error { p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now()) return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() { - task.timesEnqueued++ + task.timesEnqueued.Add(1) p.addPendingTask(task) }) } @@ -761,12 +761,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer result, err := p.forwardTaskToBuilder(builder, builderID, task) if err != nil { maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant) - if maxRetries > 0 && task.timesEnqueued >= maxRetries { + if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries { p.metrics.tasksFailed.Inc() p.removePendingTask(task) level.Error(logger).Log( "msg", "task failed after max retries", - "retries", task.timesEnqueued, + "retries", task.timesEnqueued.Load(), "maxRetries", maxRetries, "err", err, ) @@ -792,7 +792,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer p.metrics.tasksRequeued.Inc() level.Error(logger).Log( "msg", "error forwarding task to builder, Task requeued", - "retries", task.timesEnqueued, + "retries", task.timesEnqueued.Load(), "err", err, ) continue @@ -801,7 +801,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer level.Debug(logger).Log( "msg", "task completed", "duration", time.Since(task.queueTime).Seconds(), - "retries", task.timesEnqueued, + "retries", task.timesEnqueued.Load(), ) p.removePendingTask(task) diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index c76ef0e4d267..64c6ef086dac 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -517,7 +518,7 @@ func Test_BuilderLoop(t *testing.T) { resultsCh := make(chan *protos.TaskResult, nTasks) tasks := createTasks(nTasks, resultsCh) for _, task := range tasks { - err = planner.enqueueTask(task) + err := planner.enqueueTask(task) require.NoError(t, err) } @@ -527,10 +528,10 @@ func Test_BuilderLoop(t *testing.T) { builder := newMockBuilder(fmt.Sprintf("builder-%d", i)) builders = append(builders, builder) - go func() { - err = planner.BuilderLoop(builder) - require.ErrorIs(t, err, tc.expectedBuilderLoopError) - }() + go func(expectedBuilderLoopError error) { + err := planner.BuilderLoop(builder) + require.ErrorIs(t, err, expectedBuilderLoopError) + }(tc.expectedBuilderLoopError) } // Eventually, all tasks should be sent to builders @@ -558,7 +559,7 @@ func Test_BuilderLoop(t *testing.T) { // Enqueue tasks again for _, task := range tasks { - err = planner.enqueueTask(task) + err := planner.enqueueTask(task) require.NoError(t, err) } @@ -809,14 +810,15 @@ func Test_processTenantTaskResults(t *testing.T) { } type fakeBuilder struct { + mx sync.Mutex // Protects tasks and currTaskIdx. id string tasks []*protos.Task currTaskIdx int grpc.ServerStream - returnError bool - returnErrorMsg bool - wait bool + returnError atomic.Bool + returnErrorMsg atomic.Bool + wait atomic.Bool ctx context.Context ctxCancel context.CancelFunc } @@ -833,19 +835,21 @@ func newMockBuilder(id string) *fakeBuilder { } func (f *fakeBuilder) ReceivedTasks() []*protos.Task { + f.mx.Lock() + defer f.mx.Unlock() return f.tasks } func (f *fakeBuilder) SetReturnError(b bool) { - f.returnError = b + f.returnError.Store(b) } func (f *fakeBuilder) SetReturnErrorMsg(b bool) { - f.returnErrorMsg = b + f.returnErrorMsg.Store(b) } func (f *fakeBuilder) SetWait(b bool) { - f.wait = b + f.wait.Store(b) } func (f *fakeBuilder) CancelContext(b bool) { @@ -873,6 +877,8 @@ func (f *fakeBuilder) Send(req *protos.PlannerToBuilder) error { return err } + f.mx.Lock() + defer f.mx.Unlock() f.tasks = append(f.tasks, task) f.currTaskIdx++ return nil @@ -886,12 +892,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) { }, nil } - if f.returnError { + if f.returnError.Load() { return nil, fmt.Errorf("fake error from %s", f.id) } // Wait until `wait` is false - for f.wait { + for f.wait.Load() { time.Sleep(time.Second) } @@ -901,10 +907,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) { } var errMsg string - if f.returnErrorMsg { + if f.returnErrorMsg.Load() { errMsg = fmt.Sprintf("fake error from %s", f.id) } + f.mx.Lock() + defer f.mx.Unlock() return &protos.BuilderToPlanner{ BuilderID: f.id, Result: protos.ProtoTaskResult{ diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index 8580dd12a655..3080ec47a171 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -4,6 +4,8 @@ import ( "context" "time" + "go.uber.org/atomic" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" ) @@ -13,7 +15,7 @@ type QueueTask struct { resultsChannel chan *protos.TaskResult // Tracking - timesEnqueued int + timesEnqueued atomic.Int64 queueTime time.Time ctx context.Context } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index d16e833fc437..9250ec91ff86 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -140,8 +140,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("request fails when providing invalid block", func(t *testing.T) { now := mktime("2023-10-03 10:00") - _, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) + refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) + mockStore := newMockBloomStore(refs, queriers, metas) reg := prometheus.NewRegistry() gw, err := New(cfg, mockStore, logger, reg) @@ -176,7 +176,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { now := mktime("2023-10-03 10:00") refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) + mockStore := newMockBloomStore(refs, queriers, metas) mockStore.err = errors.New("request failed") reg := prometheus.NewRegistry() @@ -220,7 +220,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { // replace store implementation and re-initialize workers and sub-services refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) + mockStore := newMockBloomStore(refs, queriers, metas) mockStore.delay = 2000 * time.Millisecond reg := prometheus.NewRegistry() @@ -263,7 +263,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { now := mktime("2023-10-03 10:00") reg := prometheus.NewRegistry() - gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg) + gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -309,7 +309,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { now := mktime("2023-10-03 10:00") reg := prometheus.NewRegistry() - gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg) + gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -363,7 +363,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) reg := prometheus.NewRegistry() - store := newMockBloomStore(queriers, metas) + store := newMockBloomStore(refs, queriers, metas) gw, err := New(cfg, store, logger, reg) require.NoError(t, err) diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index f9dc847f588b..9d2d6c6d0642 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -24,16 +24,19 @@ import ( var _ bloomshipper.Store = &dummyStore{} -func newMockBloomStore(bqs []*bloomshipper.CloseableBlockQuerier, metas []bloomshipper.Meta) *dummyStore { +// refs and blocks must be in 1-1 correspondence. +func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore { return &dummyStore{ - querieres: bqs, - metas: metas, + refs: refs, + blocks: blocks, + metas: metas, } } type dummyStore struct { - metas []bloomshipper.Meta - querieres []*bloomshipper.CloseableBlockQuerier + metas []bloomshipper.Meta + refs []bloomshipper.BlockRef + blocks []*v1.Block // mock how long it takes to serve block queriers delay time.Duration @@ -77,7 +80,7 @@ func (s *dummyStore) Stop() { } func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef, _ ...bloomshipper.FetchOption) ([]*bloomshipper.CloseableBlockQuerier, error) { - result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.querieres)) + result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks)) if s.err != nil { time.Sleep(s.delay) @@ -85,8 +88,13 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef } for _, ref := range refs { - for _, bq := range s.querieres { - if ref.Bounds.Equal(bq.Bounds) { + for i, block := range s.blocks { + if ref.Bounds.Equal(s.refs[i].Bounds) { + blockCopy := *block + bq := &bloomshipper.CloseableBlockQuerier{ + BlockQuerier: v1.NewBlockQuerier(&blockCopy, false, v1.DefaultMaxPageSize), + BlockRef: s.refs[i], + } result = append(result, bq) } } @@ -107,9 +115,9 @@ func TestProcessor(t *testing.T) { metrics := newWorkerMetrics(prometheus.NewPedanticRegistry(), constants.Loki, "bloom_gatway") t.Run("success case - without blocks", func(t *testing.T) { - _, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) + refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) + mockStore := newMockBloomStore(refs, queriers, metas) p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics) chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10) @@ -154,14 +162,14 @@ func TestProcessor(t *testing.T) { }) t.Run("success case - with blocks", func(t *testing.T) { - _, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) + refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) blocks := make([]bloomshipper.BlockRef, 0, len(metas)) for _, meta := range metas { // we can safely append all block refs from the meta, because it only contains a single one blocks = append(blocks, meta.Blocks...) } - mockStore := newMockBloomStore(queriers, metas) + mockStore := newMockBloomStore(refs, queriers, metas) p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics) chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10) @@ -206,9 +214,9 @@ func TestProcessor(t *testing.T) { }) t.Run("failure case", func(t *testing.T) { - _, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) + refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) + mockStore := newMockBloomStore(refs, queriers, metas) mockStore.err = errors.New("store failed") p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics) diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 849f3a30bbfc..ed47d46456d9 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -392,12 +392,12 @@ func TestPartitionRequest(t *testing.T) { } } -func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBlooms) { +func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*v1.Block, [][]v1.SeriesWithBlooms) { t.Helper() blockRefs := make([]bloomshipper.BlockRef, 0, n) metas := make([]bloomshipper.Meta, 0, n) - queriers := make([]*bloomshipper.CloseableBlockQuerier, 0, n) + blocks := make([]*v1.Block, 0, n) series := make([][]v1.SeriesWithBlooms, 0, n) step := (maxFp - minFp) / model.Fingerprint(n) @@ -432,16 +432,12 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // t.Log(i, j, string(keys[i][j])) // } // } - querier := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize), - BlockRef: blockRef, - } - queriers = append(queriers, querier) + blocks = append(blocks, block) metas = append(metas, meta) blockRefs = append(blockRefs, blockRef) series = append(series, data) } - return blockRefs, metas, queriers, series + return blockRefs, metas, blocks, series } func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.SeriesWithBlooms, nthSeries int) []*logproto.ChunkRef { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 09eab22f74be..f7ed66b7c890 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -470,7 +470,7 @@ func TestSerialization(t *testing.T) { } require.NoError(t, it.Error()) - countExtractor = func() log.StreamSampleExtractor { + extractor := func() log.StreamSampleExtractor { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) if err != nil { panic(err) @@ -478,7 +478,7 @@ func TestSerialization(t *testing.T) { return ex.ForStream(labels.Labels{}) }() - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor) for i := 0; i < numSamples; i++ { require.True(t, sampleIt.Next(), i)