From 4901a5c452fa6822a645f56e20e704db9366182a Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko <82767850+vlad-diachenko@users.noreply.github.com> Date: Fri, 24 May 2024 23:53:29 +0300 Subject: [PATCH] fix: not owned stream count (#13030) Signed-off-by: Vladyslav Diachenko --- pkg/ingester/limiter_test.go | 4 ++-- pkg/ingester/owned_streams.go | 33 ++++++++++++++++++++++-------- pkg/ingester/owned_streams_test.go | 32 ++++++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 9d4d3b3037c6..b00bede10417 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -24,7 +24,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { expected error useOwnedStreamService bool fixedLimit int32 - ownedStreamCount int64 + ownedStreamCount int }{ "both local and global limit are disabled": { maxLocalStreamsPerUser: 0, @@ -147,7 +147,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { ownedStreamSvc := &ownedStreamService{ fixedLimit: atomic.NewInt32(testData.fixedLimit), - ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount), + ownedStreamCount: testData.ownedStreamCount, } limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor) defaultCountSupplier := func() int { diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 01cb8235f9b1..3be6fb40fdd8 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -1,6 +1,10 @@ package ingester -import "go.uber.org/atomic" +import ( + "sync" + + "go.uber.org/atomic" +) type ownedStreamService struct { tenantID string @@ -8,22 +12,25 @@ type ownedStreamService struct { fixedLimit *atomic.Int32 //todo: implement job to recalculate it - ownedStreamCount *atomic.Int64 + ownedStreamCount int + notOwnedStreamCount int + lock sync.RWMutex } func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { svc := &ownedStreamService{ - tenantID: tenantID, - limiter: limiter, - ownedStreamCount: atomic.NewInt64(0), - fixedLimit: atomic.NewInt32(0), + tenantID: tenantID, + limiter: limiter, + fixedLimit: atomic.NewInt32(0), } svc.updateFixedLimit() return svc } func (s *ownedStreamService) getOwnedStreamCount() int { - return int(s.ownedStreamCount.Load()) + s.lock.RLock() + defer s.lock.RUnlock() + return s.ownedStreamCount } func (s *ownedStreamService) updateFixedLimit() { @@ -36,9 +43,17 @@ func (s *ownedStreamService) getFixedLimit() int { } func (s *ownedStreamService) incOwnedStreamCount() { - s.ownedStreamCount.Inc() + s.lock.Lock() + defer s.lock.Unlock() + s.ownedStreamCount++ } func (s *ownedStreamService) decOwnedStreamCount() { - s.ownedStreamCount.Dec() + s.lock.Lock() + defer s.lock.Unlock() + if s.notOwnedStreamCount > 0 { + s.notOwnedStreamCount-- + return + } + s.ownedStreamCount-- } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index c7ddd9d87f29..759927a1d0cf 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -1,6 +1,7 @@ package ingester import ( + "sync" "testing" "github.com/stretchr/testify/require" @@ -29,8 +30,37 @@ func Test_OwnedStreamService(t *testing.T) { service.incOwnedStreamCount() service.incOwnedStreamCount() - require.Equal(t, 2, service.getOwnedStreamCount()) + service.incOwnedStreamCount() + require.Equal(t, 3, service.getOwnedStreamCount()) + + // simulate the effect from the recalculation job + service.notOwnedStreamCount = 1 + service.ownedStreamCount = 2 + + service.decOwnedStreamCount() + require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0") + require.Equal(t, 0, service.notOwnedStreamCount) service.decOwnedStreamCount() require.Equal(t, 1, service.getOwnedStreamCount()) + require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0") + + group := sync.WaitGroup{} + group.Add(200) + for i := 0; i < 100; i++ { + go func() { + defer group.Done() + service.incOwnedStreamCount() + }() + } + + for i := 0; i < 100; i++ { + go func() { + defer group.Done() + service.decOwnedStreamCount() + }() + } + group.Wait() + + require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed") }