Skip to content

Commit

Permalink
Bloomgateway tests: avoid race in dummyStore
Browse files Browse the repository at this point in the history
We cannot share querier or block objects across goroutines, because
they have state that is updated. Instead, copy the blocks and create a
new querier each time the mock `FetchBlocks` is called.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
  • Loading branch information
bboreham committed Jun 7, 2024
1 parent 81c51d3 commit 9289833
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 29 deletions.
14 changes: 7 additions & 7 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("request fails when providing invalid block", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(refs, queriers, metas)

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.err = errors.New("request failed")

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)
store := newMockBloomStore(refs, queriers, metas)

gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)
Expand Down
36 changes: 22 additions & 14 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import (

var _ bloomshipper.Store = &dummyStore{}

func newMockBloomStore(bqs []*bloomshipper.CloseableBlockQuerier, metas []bloomshipper.Meta) *dummyStore {
// refs and blocks must be in 1-1 correspondence.
func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore {
return &dummyStore{
querieres: bqs,
metas: metas,
refs: refs,
blocks: blocks,
metas: metas,
}
}

type dummyStore struct {
metas []bloomshipper.Meta
querieres []*bloomshipper.CloseableBlockQuerier
metas []bloomshipper.Meta
refs []bloomshipper.BlockRef
blocks []*v1.Block

// mock how long it takes to serve block queriers
delay time.Duration
Expand Down Expand Up @@ -77,16 +80,21 @@ func (s *dummyStore) Stop() {
}

func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef, _ ...bloomshipper.FetchOption) ([]*bloomshipper.CloseableBlockQuerier, error) {
result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.querieres))
result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))

if s.err != nil {
time.Sleep(s.delay)
return result, s.err
}

for _, ref := range refs {
for _, bq := range s.querieres {
if ref.Bounds.Equal(bq.Bounds) {
for i, block := range s.blocks {
if ref.Bounds.Equal(s.refs[i].Bounds) {
blockCopy := *block
bq := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(&blockCopy, false, v1.DefaultMaxPageSize),
BlockRef: s.refs[i],
}
result = append(result, bq)
}
}
Expand All @@ -107,9 +115,9 @@ func TestProcessor(t *testing.T) {
metrics := newWorkerMetrics(prometheus.NewPedanticRegistry(), constants.Loki, "bloom_gatway")

t.Run("success case - without blocks", func(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
Expand Down Expand Up @@ -154,14 +162,14 @@ func TestProcessor(t *testing.T) {
})

t.Run("success case - with blocks", func(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
blocks := make([]bloomshipper.BlockRef, 0, len(metas))
for _, meta := range metas {
// we can safely append all block refs from the meta, because it only contains a single one
blocks = append(blocks, meta.Blocks...)
}

mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
Expand Down Expand Up @@ -206,9 +214,9 @@ func TestProcessor(t *testing.T) {
})

t.Run("failure case", func(t *testing.T) {
_, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.err = errors.New("store failed")

p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)
Expand Down
12 changes: 4 additions & 8 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ func TestPartitionRequest(t *testing.T) {
}
}

func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBlooms) {
func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*v1.Block, [][]v1.SeriesWithBlooms) {
t.Helper()

blockRefs := make([]bloomshipper.BlockRef, 0, n)
metas := make([]bloomshipper.Meta, 0, n)
queriers := make([]*bloomshipper.CloseableBlockQuerier, 0, n)
blocks := make([]*v1.Block, 0, n)
series := make([][]v1.SeriesWithBlooms, 0, n)

step := (maxFp - minFp) / model.Fingerprint(n)
Expand Down Expand Up @@ -432,16 +432,12 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// t.Log(i, j, string(keys[i][j]))
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
blocks = append(blocks, block)
metas = append(metas, meta)
blockRefs = append(blockRefs, blockRef)
series = append(series, data)
}
return blockRefs, metas, queriers, series
return blockRefs, metas, blocks, series
}

func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.SeriesWithBlooms, nthSeries int) []*logproto.ChunkRef {
Expand Down

0 comments on commit 9289833

Please sign in to comment.