diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index c2838e40bc89..487b87bac352 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -44,6 +44,7 @@ package bloomgateway import ( "context" "fmt" + "slices" "sort" "time" @@ -383,6 +384,10 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas case <-ctx.Done(): // do nothing default: + // chunks may not always be sorted + if !slices.IsSortedFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) }) { + slices.SortFunc(res.Removals, func(a, b v1.ChunkRef) int { return a.Cmp(b) }) + } task.responses = append(task.responses, res) } } @@ -413,13 +418,14 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r))) } return v1.NewHeapIterator[v1.Output]( - func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp }, + func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp }, itrs..., ) } // TODO(owen-d): improve perf. This can be faster with a more specialized impl // NB(owen-d): `req` is mutated in place for performance, but `responses` is not +// Removals of the outputs must be sorted. func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs { res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) @@ -433,6 +439,7 @@ func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Outpu // from v1.Identity[v1.Output], // merge two removal sets for the same series, deduping + // requires that the removals of the outputs are sorted func(o1, o2 v1.Output) v1.Output { res := v1.Output{Fp: o1.Fp} diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 4e54ed81e552..15c9ca2be2d8 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -640,12 +640,12 @@ func TestFilterChunkRefs(t *testing.T) { { {fp: 0, checksums: []uint32{0, 1}}, {fp: 0, checksums: []uint32{0, 1, 2}}, - {fp: 1, checksums: []uint32{1}}, + {fp: 1, checksums: []uint32{0, 2}}, {fp: 2, checksums: []uint32{1}}, }, }, expected: mkResult([]instruction{ - {fp: 1, checksums: []uint32{0, 2}}, + {fp: 1, checksums: []uint32{1}}, {fp: 2, checksums: []uint32{0, 2}}, {fp: 3, checksums: []uint32{0, 1, 2}}, }), @@ -670,6 +670,27 @@ func TestFilterChunkRefs(t *testing.T) { {fp: 3, checksums: []uint32{0, 1, 2}}, }), }, + { + desc: "unordered fingerprints", + input: mkInput(4, 3), + removals: [][]instruction{ + { + {fp: 3, checksums: []uint32{2}}, + {fp: 0, checksums: []uint32{1, 2}}, + {fp: 2, checksums: []uint32{1, 2}}, + }, + { + {fp: 1, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{0, 1}}, + {fp: 3, checksums: []uint32{0}}, + }, + }, + expected: mkResult([]instruction{ + {fp: 0, checksums: []uint32{0}}, + {fp: 1, checksums: []uint32{0, 2}}, + {fp: 3, checksums: []uint32{1}}, + }), + }, } { t.Run(tc.desc, func(t *testing.T) { res := filterChunkRefs(tc.input, mkRemovals(tc.removals)) diff --git a/pkg/bloomgateway/cache_test.go b/pkg/bloomgateway/cache_test.go index 841c15548271..f85d366cc5aa 100644 --- a/pkg/bloomgateway/cache_test.go +++ b/pkg/bloomgateway/cache_test.go @@ -344,7 +344,10 @@ func TestMerge(t *testing.T) { m := newMerger() actual, err := m.MergeResponse(input...) require.NoError(t, err) - require.Equal(t, tc.expected, actual) + + resp, ok := actual.(*logproto.FilterChunkRefResponse) + require.True(t, ok) + require.Equal(t, tc.expected, resp) }) } } diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index a17b9ec0e8c6..af4b45388241 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -294,13 +294,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input)) for _, inp := range input { + sort.Slice(inp, func(i, j int) bool { return inp[i].Fingerprint < inp[j].Fingerprint }) iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) } heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs]( - func(a, b *logproto.GroupedChunkRefs) bool { - return a.Fingerprint < b.Fingerprint - }, + func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint < b.Fingerprint }, iters..., ) @@ -311,10 +310,17 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh v1.Identity[*logproto.GroupedChunkRefs], // merge func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { + // TODO(chaudum): Check if we can assume sorted shortrefs here + if !slices.IsSortedFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) { + slices.SortFunc(a.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) + } + if !slices.IsSortedFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) { + slices.SortFunc(b.Refs, func(a, b *logproto.ShortRef) int { return a.Cmp(b) }) + } return &logproto.GroupedChunkRefs{ Fingerprint: a.Fingerprint, Tenant: a.Tenant, - Refs: mergeChunks(a.Refs, b.Refs), + Refs: mergeChunkSets(a.Refs, b.Refs), } }, // iterator @@ -324,51 +330,37 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh return v1.CollectInto(dedupeIter, buf) } -func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { - if len(inputs) == 0 { - return nil - } +// mergeChunkSets merges and deduplicates two sorted slices of shortRefs +func mergeChunkSets(s1, s2 []*logproto.ShortRef) (result []*logproto.ShortRef) { + var i, j int + for i < len(s1) && j < len(s2) { + a, b := s1[i], s2[j] + + if a.Equal(b) { + result = append(result, a) + i++ + j++ + continue + } + + if a.Less(b) { + result = append(result, a) + i++ + continue + } - if len(inputs) == 1 { - slices.SortFunc( - inputs[0], - func(a, b *logproto.ShortRef) int { - if a.Equal(b) { - return 0 - } - if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) { - return -1 - } - return 1 - }, - ) - return inputs[0] + result = append(result, b) + j++ } - iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs)) - for _, inp := range inputs { - iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) + if i < len(s1) { + result = append(result, s1[i:]...) + } + if j < len(s2) { + result = append(result, s2[j:]...) } - chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef]( - // eq - func(a, b *logproto.ShortRef) bool { return a.Equal(b) }, - // from - v1.Identity[*logproto.ShortRef], - // merge - func(a, b *logproto.ShortRef) *logproto.ShortRef { return a }, - // iterator - v1.NewPeekingIter[*logproto.ShortRef]( - v1.NewHeapIterator[*logproto.ShortRef]( - func(a, b *logproto.ShortRef) bool { - return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) - }, - iters..., - ), - ), - ) - merged, _ := v1.Collect(chunkDedupe) - return merged + return result } // doForAddrs sequetially calls the provided callback function fn for each diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index db0a0adcff47..d46d881078da 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -70,3 +70,27 @@ func TestGatewayClient_MergeSeries(t *testing.T) { result, _ := mergeSeries(inputs, nil) require.Equal(t, expected, result) } + +func TestGatewayClient_MergeChunkSets(t *testing.T) { + inp1 := []*logproto.ShortRef{ + shortRef(1, 3, 1), + shortRef(2, 3, 2), + shortRef(4, 5, 3), + } + inp2 := []*logproto.ShortRef{ + shortRef(2, 3, 2), + shortRef(3, 4, 4), + shortRef(5, 6, 5), + } + + expected := []*logproto.ShortRef{ + shortRef(1, 3, 1), + shortRef(2, 3, 2), + shortRef(3, 4, 4), + shortRef(4, 5, 3), + shortRef(5, 6, 5), + } + + result := mergeChunkSets(inp1, inp2) + require.Equal(t, expected, result) +} diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index 6fc6e993be64..58415a1fd667 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -99,9 +100,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from preFilterChunks := len(chunkRefs) preFilterSeries := len(grouped) - result := make([]*logproto.ChunkRef, 0, len(chunkRefs)) - seriesSeen := make(map[uint64]struct{}, len(grouped)) - + responses := make([][]*logproto.GroupedChunkRefs, 0, 2) // We can perform requests sequentially, because most of the time the request // only covers a single day, and if not, it's at most two days. for _, s := range partitionSeriesByDay(from, through, grouped) { @@ -110,19 +109,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from if err != nil { return nil, err } - var chunks int - for i := range s.series { - chunks += len(s.series[i].Refs) - } - sp.LogKV( - "day", s.day.Time.Time(), - "from", s.interval.Start.Time(), - "through", s.interval.End.Time(), - "series", len(s.series), - "chunks", chunks, - "blocks", len(blocks), - "skipped", len(skipped), - ) refs, err := bq.c.FilterChunks(ctx, tenant, s.interval, blocks, queryPlan) if err != nil { @@ -130,33 +116,45 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from } // add chunk refs from series that were not mapped to any blocks - refs = append(refs, skipped...) + responses = append(responses, refs, skipped) bq.metrics.seriesSkipped.Add(float64(len(skipped))) + } + + deduped, err := mergeSeries(responses, nil) + if err != nil { + return nil, errors.Wrap(err, "failed to dedupe results") + } - for i := range refs { - seriesSeen[refs[i].Fingerprint] = struct{}{} - for _, ref := range refs[i].Refs { - result = append(result, &logproto.ChunkRef{ - Fingerprint: refs[i].Fingerprint, - UserID: tenant, - From: ref.From, - Through: ref.Through, - Checksum: ref.Checksum, - }) - } + result := make([]*logproto.ChunkRef, 0, len(chunkRefs)) + for i := range deduped { + for _, ref := range deduped[i].Refs { + result = append(result, &logproto.ChunkRef{ + Fingerprint: deduped[i].Fingerprint, + UserID: tenant, + From: ref.From, + Through: ref.Through, + Checksum: ref.Checksum, + }) } } + postFilterChunks := len(result) + postFilterSeries := len(deduped) + level.Debug(bq.logger).Log( + "operation", "bloomquerier.FilterChunkRefs", + "tenant", tenant, + "from", from.Time(), + "through", through.Time(), + "responses", len(responses), "preFilterChunks", preFilterChunks, - "postFilterChunks", len(result), + "postFilterChunks", postFilterChunks, + "filteredChunks", preFilterChunks-postFilterChunks, "preFilterSeries", preFilterSeries, - "postFilterSeries", len(seriesSeen), + "postFilterSeries", postFilterSeries, + "filteredSeries", preFilterSeries-postFilterSeries, ) - postFilterChunks := len(result) - postFilterSeries := len(seriesSeen) - bq.metrics.chunksTotal.Add(float64(preFilterChunks)) bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks)) bq.metrics.seriesTotal.Add(float64(preFilterSeries)) diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index cfb09e285836..a11467584b58 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -408,6 +408,65 @@ func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resul return &clone } +func (a *GroupedChunkRefs) Cmp(b *GroupedChunkRefs) int { + if b == nil { + if a == nil { + return 0 + } + return 1 + } + if a.Fingerprint < b.Fingerprint { + return -1 + } + if a.Fingerprint > b.Fingerprint { + return 1 + } + return 0 +} + +func (a *GroupedChunkRefs) Less(b *GroupedChunkRefs) bool { + if b == nil { + return a == nil + } + return a.Fingerprint < b.Fingerprint +} + +// Cmp returns a positive number when a > b, a negative number when a < b, and 0 when a == b +func (a *ShortRef) Cmp(b *ShortRef) int { + if b == nil { + if a == nil { + return 0 + } + return 1 + } + + if a.From != b.From { + return int(a.From) - int(b.From) + } + + if a.Through != b.Through { + return int(a.Through) - int(b.Through) + } + + return int(a.Checksum) - int(b.Checksum) +} + +func (a *ShortRef) Less(b *ShortRef) bool { + if b == nil { + return a == nil + } + + if a.From != b.From { + return a.From < b.From + } + + if a.Through != b.Through { + return a.Through < b.Through + } + + return a.Checksum < b.Checksum +} + func (m *ShardsRequest) GetCachingOptions() (res definitions.CachingOptions) { return } func (m *ShardsRequest) GetStart() time.Time { diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index ff9ecaffbac3..674a1a883dfb 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -425,6 +425,18 @@ func (r *ChunkRef) Less(other ChunkRef) bool { return r.Checksum < other.Checksum } +func (r *ChunkRef) Cmp(other ChunkRef) int { + if r.From != other.From { + return int(other.From) - int(r.From) + } + + if r.Through != other.Through { + return int(other.Through) - int(r.Through) + } + + return int(other.Checksum) - int(r.Checksum) +} + func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time { // delta encode start time enc.PutVarint64(int64(r.From - previousEnd)) diff --git a/pkg/storage/bloom/v1/index_test.go b/pkg/storage/bloom/v1/index_test.go index eb61b1e2a2ab..95e35617b568 100644 --- a/pkg/storage/bloom/v1/index_test.go +++ b/pkg/storage/bloom/v1/index_test.go @@ -55,7 +55,72 @@ func TestSeriesEncoding(t *testing.T) { require.Equal(t, src, dst) } -func TestChunkRefCompare(t *testing.T) { +func TestChunkRefCmpLess(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + desc string + left, right ChunkRef + expCmp int + expLess bool + }{ + { + desc: "From/Through/Checksum are equal", + left: ChunkRef{0, 0, 0}, + right: ChunkRef{0, 0, 0}, + expCmp: 0, + expLess: false, + }, + { + desc: "From is before", + left: ChunkRef{0, 1, 0}, + right: ChunkRef{1, 1, 0}, + expCmp: 1, + expLess: true, + }, + { + desc: "From is after", + left: ChunkRef{1, 1, 0}, + right: ChunkRef{0, 1, 0}, + expCmp: -1, + expLess: false, + }, + { + desc: "Through is before", + left: ChunkRef{0, 1, 0}, + right: ChunkRef{0, 2, 0}, + expCmp: 1, + expLess: true, + }, + { + desc: "Through is after", + left: ChunkRef{0, 2, 0}, + right: ChunkRef{0, 1, 0}, + expCmp: -1, + expLess: false, + }, + { + desc: "Checksum is smaller", + left: ChunkRef{0, 1, 0}, + right: ChunkRef{0, 1, 1}, + expCmp: 1, + expLess: true, + }, + { + desc: "Checksum is bigger", + left: ChunkRef{0, 0, 1}, + right: ChunkRef{0, 0, 0}, + expCmp: -1, + expLess: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.expCmp, tc.left.Cmp(tc.right)) + require.Equal(t, tc.expLess, tc.left.Less(tc.right)) + }) + } +} + +func TestChunkRefsCompare(t *testing.T) { t.Parallel() for _, tc := range []struct { desc string