From a2fbaa8e09b6eebff2f7c20746e84f1365bd7433 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 10 Sep 2024 17:16:29 +0200 Subject: [PATCH] feat(blooms)!: Index structured metadata into blooms (#14061) Instead of indexing n-grams of the log line content, we index the plain values of the structured metadata keys and values. Resulting tokens are: * `name` * `chunkPrefix + name` * `value` * `chunkPrefix + value` * `name + '=' value` * `chunkPrefix + name + '=' + value` Indexed fields (metadata name) are also extracted into the series metadata. Indexed metadata values cannot be queried with the bloom gateways yet. This PR does not cleanup unused code used for ngram-tokenization, it is scope for a follow up. --- Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_tokenizer.go | 136 ++++++++----------- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 119 ++++++++-------- pkg/storage/bloom/v1/builder.go | 52 ++++--- pkg/storage/bloom/v1/builder_test.go | 29 ++-- pkg/storage/bloom/v1/filter/scalable.go | 7 +- pkg/storage/bloom/v1/filter/scalable_test.go | 2 +- pkg/storage/bloom/v1/tokenizer.go | 26 ++++ pkg/storage/bloom/v1/tokenizer_test.go | 15 ++ pkg/storage/bloom/v1/versioned_builder.go | 5 +- tools/bloom/inspector/main.go | 50 ++++++- 10 files changed, 264 insertions(+), 177 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index d35b3c13b6a2..333e2f22a37c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -2,7 +2,6 @@ package v1 import ( "math" - "unsafe" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -92,19 +91,15 @@ func estimatedCount(m uint, p float64) uint { // Populates a bloom filter(s) with the tokens from the given chunks. // Called once per series -func (bt *BloomTokenizer) Populate( - blooms v2iter.SizedIterator[*Bloom], - chks v2iter.Iterator[ChunkRefWithIter], - ch chan *BloomCreation, -) { +func (bt *BloomTokenizer) Populate(blooms v2iter.SizedIterator[*Bloom], chks v2iter.Iterator[ChunkRefWithIter], ch chan *BloomCreation) { clear(bt.cache) // MUST always clear the cache before starting a new series var next bool // All but the last bloom are considered full -- send back unaltered for next = blooms.Next(); next && blooms.Remaining() > 0; next = blooms.Next() { ch <- &BloomCreation{ - Bloom: blooms.At(), - SourceBytesAdded: 0, + Bloom: blooms.At(), + Info: newIndexingInfo(), } } @@ -118,35 +113,29 @@ func (bt *BloomTokenizer) Populate( // We have the feeling that the empty blooms may be reused from old blocks. // Here we log an error if we find an empty bloom. if bloom.Count() == 0 { - level.Warn(bt.logger).Log( - "msg", "found existing empty bloom", - ) + level.Warn(bt.logger).Log("msg", "found existing empty bloom") } } else { bloom = NewBloom() } - var bytesAdded int + info := newIndexingInfo() for chks.Next() { chk := chks.At() itr := v2iter.NewPeekIter(chk.Itr) for { - full, newBytes := bt.addChunkToBloom( - bloom, - chk.Ref, - itr, - ) - bytesAdded += newBytes + full, chunkStats := bt.addChunkToBloom(bloom, chk.Ref, itr) + info = info.merge(chunkStats) // If a bloom is full, the chunk wasn't completely added // so we'll submit this bloom, start a new one, and continue indexing if full { - bt.sendBloom(ch, bloom, bytesAdded) + bt.sendBloom(ch, bloom, info) - // start a new bloom + reset bytesAdded counter - bytesAdded = 0 + // start a new bloom + reset stats + info = newIndexingInfo() bloom = NewBloom() // cache _MUST_ be cleared when a new bloom is created to ensure that all tokens from @@ -161,21 +150,15 @@ func (bt *BloomTokenizer) Populate( // TODO(salvacorts): Delete this once we solve the correctness bug if bloom.Count() == 0 { - level.Warn(bt.logger).Log( - "msg", "resulting bloom is empty", - ) + level.Warn(bt.logger).Log("msg", "resulting bloom is empty") } // Send the last bloom - bt.sendBloom(ch, bloom, bytesAdded) + bt.sendBloom(ch, bloom, info) close(ch) } -func (bt *BloomTokenizer) sendBloom( - ch chan<- *BloomCreation, - bloom *Bloom, - bytesAdded int, -) { +func (bt *BloomTokenizer) sendBloom(ch chan<- *BloomCreation, bloom *Bloom, info indexingInfo) { fillRatio := bloom.ScalableBloomFilter.FillRatio() bt.metrics.hammingWeightRatio.Observe(fillRatio) bt.metrics.estimatedCount.Observe( @@ -184,70 +167,57 @@ func (bt *BloomTokenizer) sendBloom( bt.metrics.bloomSize.Observe(float64(bloom.ScalableBloomFilter.Capacity() / eightBits)) bt.metrics.bloomsTotal.Inc() ch <- &BloomCreation{ - Bloom: bloom, - SourceBytesAdded: bytesAdded, + Bloom: bloom, + Info: info, } } -// addChunkToBloom adds the tokens from the given chunk to the given bloom. -// It continues until the chunk is exhausted or the bloom is full. -// NB(owen-d): We ensure the invariant that each line is indexed entirely into at least one bloom. -// This includes both raw ngrams and chunk-prefixed ngrams and is why we use a peeking iterator -- -// so we can advance the iterator only after we're sure the bloom has accepted the line. -// This is because the _line_ is the atom in Loki's data model and a query must either match (or not) an individual line. -// Therefore, we index entire lines into a bloom to ensure a lookups are accurate. -func (bt *BloomTokenizer) addChunkToBloom(bloom *Bloom, ref ChunkRef, entryIter v2iter.PeekIterator[push.Entry]) (full bool, bytesAdded int) { +func prefixForChunkRef(chk ChunkRef) []byte { + enc := encoding.EncWith(make([]byte, 0, 20)) + enc.PutBE64(uint64(chk.From)) // 8 bytes + enc.PutBE64(uint64(chk.Through)) // 8 bytes + enc.PutBE32(chk.Checksum) // 4 bytes + return enc.Get() +} + +// addChunkToBloom adds the values from structured metadata from the entries of the given chunk to the given bloom. +// addChunkToBloom returns true if the bloom has been completely filled, and may not have consumed the entire iterator. +// addChunkToBloom must be called multiple times until returning false with new blooms until the iterator has been fully consumed. +func (bt *BloomTokenizer) addChunkToBloom(bloom *Bloom, ref ChunkRef, entryIter v2iter.PeekIterator[push.Entry]) (bool, indexingInfo) { var ( - tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N(), ref, nil) - tokens int - successfulInserts int - cachedInserts int - collisionInserts int - chunkBytes int - linesAdded int + tokens int + successfulInserts int + cachedInserts int + collisionInserts int + linesAdded int + + collision bool ) + // return values + full, info := false, newIndexingInfo() + + tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(ref))) + // We use a peeking iterator to avoid advancing the iterator until we're sure the bloom has accepted the line. -outer: for entry, ok := entryIter.Peek(); ok; entry, ok = entryIter.Peek() { - line := entry.Line - chunkBytes += len(line) - - tokenItrs := []v2iter.Iterator[[]byte]{ - // two iterators, one for the raw tokens and one for the chunk prefixed tokens. - // Warning: the underlying line tokenizer (used in both iterators) uses the same buffer for tokens. - // They are NOT SAFE for concurrent use. - NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(line)), - bt.lineTokenizer.Tokens(line), - } + for _, kv := range entry.StructuredMetadata { + info.sourceBytes += len(kv.Name) + len(kv.Value) + info.indexedFields.Add(Field(kv.Name)) - for _, itr := range tokenItrs { - for itr.Next() { - tok := itr.At() + tokenItr := tokenizer.Tokens(kv) + for tokenItr.Next() { + tok := tokenItr.At() tokens++ - // TODO[owen-d]: [n]byte this - // To avoid allocations, an unsafe string can be used to check ownership in cache. - str := unsafe.String(unsafe.SliceData(tok), len(tok)) // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters - if _, found := bt.cache[str]; found { + if _, found := bt.cache[tok]; found { cachedInserts++ continue } // maxBloomSize is in bytes, but blooms operate at the bit level; adjust - var collision bool - collision, full = bloom.ScalableBloomFilter.TestAndAddWithMaxSize(tok, bt.maxBloomSize*eightBits) - - if full { - // edge case: one line maxed out the bloom size -- retrying is futile - // (and will loop endlessly), so we'll just skip indexing it - if linesAdded == 0 { - _ = entryIter.Next() - } - - break outer - } + collision, full = bloom.ScalableBloomFilter.TestAndAddWithMaxSize([]byte(tok), bt.maxBloomSize*eightBits) if collision { collisionInserts++ @@ -257,8 +227,7 @@ outer: // only register the key in the cache if it was successfully added to the bloom // as can prevent us from trying subsequent copies - str = string(tok) - bt.cache[str] = nil + bt.cache[tok] = nil if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other clear(bt.cache) } @@ -268,6 +237,11 @@ outer: // Only advance the iterator once we're sure the bloom has accepted the line linesAdded++ _ = entryIter.Next() + + // Only break out of the loop if the bloom filter is full after indexing all structured metadata of an entry. + if full { + break + } } // update metrics after each chunk added for more consistent reporting @@ -275,7 +249,7 @@ outer: bt.metrics.insertsTotal.WithLabelValues(collisionTypeFalse).Add(float64(successfulInserts)) bt.metrics.insertsTotal.WithLabelValues(collisionTypeCache).Add(float64(cachedInserts)) bt.metrics.insertsTotal.WithLabelValues(collisionTypeTrue).Add(float64(collisionInserts)) - bt.metrics.sourceBytesAdded.Add(float64(chunkBytes)) + bt.metrics.sourceBytesAdded.Add(float64(info.sourceBytes)) - return full, chunkBytes + return full, info } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 29deada6ab82..7e8f5c4c9993 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -100,12 +100,15 @@ func TestTokenizerPopulate(t *testing.T) { var testLine = "this is a log line" bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) - sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) - + metadata := push.LabelsAdapter{ + {Name: "pod", Value: "loki-1"}, + {Name: "trace_id", Value: "3bef3c91643bde73"}, + } memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ - Timestamp: time.Unix(0, 1), - Line: testLine, + Timestamp: time.Unix(0, 1), + Line: testLine, + StructuredMetadata: metadata, }) itr, err := memChunk.Iterator( context.Background(), @@ -116,24 +119,25 @@ func TestTokenizerPopulate(t *testing.T) { ) require.Nil(t, err) - bloom := Bloom{ - ScalableBloomFilter: *sbf, - } + ref := ChunkRef{} + bloom := NewBloom() blooms, err := populateAndConsumeBloom( bt, - v2.NewSliceIter([]*Bloom{&bloom}), - v2.NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, - Itr: itr}}), + v2.NewSliceIter([]*Bloom{bloom}), + v2.NewSliceIter([]ChunkRefWithIter{{Ref: ref, Itr: itr}}), ) require.NoError(t, err) require.Equal(t, 1, len(blooms)) - tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip) - toks := tokenizer.Tokens(testLine) - for toks.Next() { - token := toks.At() - require.True(t, blooms[0].Test(token)) + tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(ref))) + + for _, kv := range metadata { + tokens := tokenizer.Tokens(kv) + for tokens.Next() { + token := tokens.At() + require.True(t, blooms[0].Test([]byte(token))) + } } } @@ -141,10 +145,15 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { var testLine = "this is a log line" bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + metadata := push.LabelsAdapter{ + {Name: "pod", Value: "loki-1"}, + {Name: "trace_id", Value: "3bef3c91643bde73"}, + } memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ - Timestamp: time.Unix(0, 1), - Line: testLine, + Timestamp: time.Unix(0, 1), + Line: testLine, + StructuredMetadata: metadata, }) itr, err := memChunk.Iterator( context.Background(), @@ -155,30 +164,34 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { ) require.Nil(t, err) + ref := ChunkRef{} + blooms, err := populateAndConsumeBloom( bt, v2.NewEmptyIter[*Bloom](), - v2.NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, - Itr: itr}}), + v2.NewSliceIter([]ChunkRefWithIter{{Ref: ref, Itr: itr}}), ) require.NoError(t, err) require.Equal(t, 1, len(blooms)) - tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip) - toks := tokenizer.Tokens(testLine) - for toks.Next() { - token := toks.At() - require.True(t, blooms[0].Test(token)) - } + tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(ref))) + for _, kv := range metadata { + tokens := tokenizer.Tokens(kv) + for tokens.Next() { + token := tokens.At() + require.True(t, blooms[0].Test([]byte(token))) + } + } } -func chunkRefItrFromLines(lines ...string) (iter.EntryIterator, error) { +func chunkRefItrFromMetadata(metadata ...push.LabelsAdapter) (iter.EntryIterator, error) { memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - for i, line := range lines { + for i, md := range metadata { if _, err := memChunk.Append(&push.Entry{ - Timestamp: time.Unix(0, int64(i)), - Line: line, + Timestamp: time.Unix(0, int64(i)), + Line: "line content", + StructuredMetadata: md, }); err != nil { return nil, err } @@ -195,7 +208,7 @@ func chunkRefItrFromLines(lines ...string) (iter.EntryIterator, error) { } func randomStr(ln int) string { - rng := rand.New(rand.NewSource(0)) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) charset := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_!@#$%^&*() ") res := make([]rune, ln) @@ -206,24 +219,20 @@ func randomStr(ln int) string { } func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) { - maxSize := 2048 + maxSize := 4 << 10 bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil), logger.NewNopLogger()) ch := make(chan *BloomCreation) - line := randomStr(10e3) - itr, err := chunkRefItrFromLines(line) + + metadata := make([]push.LabelsAdapter, 0, 4<<10) + for i := 0; i < cap(metadata); i++ { + metadata = append(metadata, push.LabelsAdapter{{Name: "trace_id", Value: randomStr(12)}}) + } + + itr, err := chunkRefItrFromMetadata(metadata...) require.NoError(t, err) go bt.Populate( - v2.NewSliceIter([]*Bloom{ - { - *filter.NewScalableBloomFilter(1024, 0.01, 0.8), - }, - }), - v2.NewSliceIter([]ChunkRefWithIter{ - { - Ref: ChunkRef{}, - Itr: itr, - }, - }), + v2.NewEmptyIter[*Bloom](), + v2.NewSliceIter([]ChunkRefWithIter{{Ref: ChunkRef{}, Itr: itr}}), ch, ) @@ -231,10 +240,11 @@ func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) { for created := range ch { ct++ capacity := created.Bloom.ScalableBloomFilter.Capacity() / 8 + t.Log(ct, int(capacity), maxSize) require.Less(t, int(capacity), maxSize) } // ensure we created two bloom filters from this dataset - require.Equal(t, 2, ct) + require.Greater(t, ct, 2) } func populateAndConsumeBloom( @@ -292,21 +302,19 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil), logger.NewNopLogger()) - line := "foobarbazz" + md := push.LabelsAdapter{ + {Name: "trace_id", Value: "3bef3c91643bde73"}, + } var blooms []*Bloom + ref := ChunkRef{} for i := 0; i < 2; i++ { ch := make(chan *BloomCreation) - itr, err := chunkRefItrFromLines(line) + itr, err := chunkRefItrFromMetadata(md) require.NoError(t, err) go bt.Populate( v2.NewEmptyIter[*Bloom](), - v2.NewSliceIter([]ChunkRefWithIter{ - { - Ref: ChunkRef{}, - Itr: itr, - }, - }), + v2.NewSliceIter([]ChunkRefWithIter{{Ref: ref, Itr: itr}}), ch, ) var ct int @@ -319,11 +327,12 @@ func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { } + tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(ref))) for _, bloom := range blooms { - toks := bt.lineTokenizer.Tokens(line) + toks := tokenizer.Tokens(md[0]) for toks.Next() { token := toks.At() - require.True(t, bloom.Test(token)) + require.True(t, bloom.Test([]byte(token))) } require.NoError(t, toks.Err()) } diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 3a61234a1b12..1b43ccadee2a 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -151,10 +151,29 @@ func (w *PageWriter) writePage(writer io.Writer, pool chunkenc.WriterPool, crc32 return decompressedLen, w.enc.Len(), nil } +// indexingInfo is a datastructure that holds information about the indexing operation. +type indexingInfo struct { + sourceBytes int + indexedFields Set[Field] +} + +func newIndexingInfo() indexingInfo { + return indexingInfo{ + sourceBytes: 0, + indexedFields: NewSet[Field](16), + } +} + +func (s indexingInfo) merge(other indexingInfo) indexingInfo { + s.sourceBytes += other.sourceBytes + s.indexedFields.Union(other.indexedFields) + return s +} + type BloomCreation struct { - Bloom *Bloom - SourceBytesAdded int - Err error + Bloom *Bloom + Info indexingInfo + Err error } // Simplistic implementation of a merge builder that builds a single block @@ -222,7 +241,8 @@ func (mb *MergeBuilder) processNextSeries( bool, // done building block error, // error ) { - var blockSeriesIterated, chunksIndexed, chunksCopied, bytesAdded int + var blockSeriesIterated, chunksIndexed, chunksCopied int + defer func() { mb.metrics.blockSeriesIterated.Add(float64(blockSeriesIterated)) mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed)) @@ -260,6 +280,7 @@ func (mb *MergeBuilder) processNextSeries( offsets []BloomOffset chunksToAdd = nextInStore.Chunks preExistingBlooms iter.SizedIterator[*Bloom] = iter.NewEmptyIter[*Bloom]() + info = newIndexingInfo() ) if nextInBlocks != nil && nextInBlocks.Series.Fingerprint == nextInStore.Fingerprint { @@ -275,31 +296,26 @@ func (mb *MergeBuilder) processNextSeries( ch := make(chan *BloomCreation) go mb.populate(nextInStore, preExistingBlooms, chunksToAdd, ch) - for bloom := range ch { - if bloom.Err != nil { - return nil, bytesAdded, 0, false, false, errors.Wrap(bloom.Err, "populating bloom") + for creation := range ch { + if creation.Err != nil { + return nil, info.sourceBytes, 0, false, false, errors.Wrap(creation.Err, "populating bloom") } - offset, err := builder.AddBloom(bloom.Bloom) + offset, err := builder.AddBloom(creation.Bloom) if err != nil { - return nil, bytesAdded, 0, false, false, errors.Wrapf( + return nil, info.sourceBytes, 0, false, false, errors.Wrapf( err, "adding bloom to block for fp (%s)", nextInStore.Fingerprint, ) } offsets = append(offsets, offset) - bytesAdded += bloom.SourceBytesAdded + info.merge(creation.Info) } - // TODO(chaudum): Use the indexed fields from bloom creation, however, - // currently we still build blooms from log lines. - fields := NewSet[Field](1) - fields.Add("__line__") - - done, err := builder.AddSeries(*nextInStore, offsets, fields) + done, err := builder.AddSeries(*nextInStore, offsets, info.indexedFields) if err != nil { - return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series") + return nil, info.sourceBytes, 0, false, false, errors.Wrap(err, "committing series") } - return nextInBlocks, bytesAdded, chunksIndexed + chunksCopied, blocksFinished, done, nil + return nextInBlocks, info.sourceBytes, chunksIndexed + chunksCopied, blocksFinished, done, nil } func (mb *MergeBuilder) Build(builder *BlockBuilder) (checksum uint32, totalBytes int, err error) { diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index cbe183e89045..5e56c3507e88 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/loki/v3/pkg/chunkenc" iter "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" "github.com/grafana/loki/v3/pkg/util/mempool" ) @@ -248,9 +247,13 @@ func TestMergeBuilder(t *testing.T) { pop := func(_ *Series, srcBlooms iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) { for srcBlooms.Next() { bloom := srcBlooms.At() + stats := indexingInfo{ + sourceBytes: int(bloom.Capacity()) / 8, + indexedFields: NewSetFromLiteral[Field]("__all__"), + } ch <- &BloomCreation{ - Bloom: bloom, - SourceBytesAdded: int(bloom.Capacity()) / 8, + Bloom: bloom, + Info: stats, } } close(ch) @@ -353,11 +356,15 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { } // We're not testing the ability to extend a bloom in this test - pop := func(_ *Series, _ iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) { + pop := func(s *Series, _ iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) { + bloom := NewBloom() + stats := indexingInfo{ + sourceBytes: int(bloom.Capacity()) / 8, + indexedFields: NewSetFromLiteral[Field]("__all__"), + } ch <- &BloomCreation{ - Bloom: &Bloom{ - ScalableBloomFilter: *filter.NewScalableBloomFilter(1024, 0.01, 0.8), - }, + Bloom: bloom, + Info: stats, } close(ch) } @@ -527,9 +534,13 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { pop := func(_ *Series, srcBlooms iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) { for srcBlooms.Next() { bloom := srcBlooms.At() + stats := indexingInfo{ + sourceBytes: int(bloom.Capacity()) / 8, + indexedFields: NewSetFromLiteral[Field]("__all__"), + } ch <- &BloomCreation{ - Bloom: bloom, - SourceBytesAdded: int(bloom.Capacity()) / 8, + Bloom: bloom, + Info: stats, } } close(ch) diff --git a/pkg/storage/bloom/v1/filter/scalable.go b/pkg/storage/bloom/v1/filter/scalable.go index b6078c6dad33..ca979632db1d 100644 --- a/pkg/storage/bloom/v1/filter/scalable.go +++ b/pkg/storage/bloom/v1/filter/scalable.go @@ -88,10 +88,9 @@ func NewScalableBloomFilter(hint uint, fpRate, r float64) *ScalableBloomFilter { return s } -// NewDefaultScalableBloomFilter creates a new Scalable Bloom Filter with the -// specified target false-positive rate and an optimal tightening ratio. -func NewDefaultScalableBloomFilter(fpRate float64) *ScalableBloomFilter { - return NewScalableBloomFilter(10000, fpRate, 0.8) +// NewDefaultScalableBloomFilter creates a new Scalable Bloom Filter. +func NewDefaultScalableBloomFilter() *ScalableBloomFilter { + return NewScalableBloomFilter(10e3, 0.1, 0.8) } // Capacity returns the current Scalable Bloom Filter capacity, which is the diff --git a/pkg/storage/bloom/v1/filter/scalable_test.go b/pkg/storage/bloom/v1/filter/scalable_test.go index a4f3d6d49ccb..2456277f2e93 100644 --- a/pkg/storage/bloom/v1/filter/scalable_test.go +++ b/pkg/storage/bloom/v1/filter/scalable_test.go @@ -20,7 +20,7 @@ import ( // Ensures that NewDefaultScalableBloomFilter creates a Scalable Bloom Filter // with hint = 10000 and r = 0.8. func TestNewDefaultScalableBloomFilter(t *testing.T) { - f := NewDefaultScalableBloomFilter(0.1) + f := NewDefaultScalableBloomFilter() if f.fp != 0.1 { t.Errorf("Expected 0.1, got %f", f.fp) diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index dcd7c2146869..bf7e12983b6b 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -1,8 +1,10 @@ package v1 import ( + "fmt" "unicode/utf8" + "github.com/grafana/loki/pkg/push" iter "github.com/grafana/loki/v3/pkg/iter/v2" ) @@ -10,6 +12,30 @@ const ( MaxRuneLen = 4 ) +type StructuredMetadataTokenizer struct { + // prefix to add to tokens, typically the encoded chunkref + prefix string + tokens []string +} + +func NewStructuredMetadataTokenizer(prefix string) *StructuredMetadataTokenizer { + return &StructuredMetadataTokenizer{ + prefix: prefix, + tokens: make([]string, 6), + } +} + +// Tokens implements the NGramBuilder interface +func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator[string] { + combined := fmt.Sprintf("%s=%s", kv.Name, kv.Value) + t.tokens = append(t.tokens[:0], + kv.Name, t.prefix+kv.Name, + kv.Value, t.prefix+kv.Value, + combined, t.prefix+combined, + ) + return iter.NewSliceIter(t.tokens) +} + func reassemble(buf []rune, ln, pos int, result []byte) []byte { result = result[:0] // Reset the result slice for i := 0; i < ln; i++ { diff --git a/pkg/storage/bloom/v1/tokenizer_test.go b/pkg/storage/bloom/v1/tokenizer_test.go index c12788fe0f80..e95e4649bd3e 100644 --- a/pkg/storage/bloom/v1/tokenizer_test.go +++ b/pkg/storage/bloom/v1/tokenizer_test.go @@ -5,6 +5,9 @@ import ( "unicode/utf8" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/push" + v2 "github.com/grafana/loki/v3/pkg/iter/v2" ) const BigFile = "../../../logql/sketch/testdata/war_peace.txt" @@ -230,3 +233,15 @@ func BenchmarkTokens(b *testing.B) { }) } } + +func TestStructuredMetadataTokenizer(t *testing.T) { + tokenizer := NewStructuredMetadataTokenizer("chunk") + + metadata := push.LabelAdapter{Name: "pod", Value: "loki-1"} + expected := []string{"pod", "chunkpod", "loki-1", "chunkloki-1", "pod=loki-1", "chunkpod=loki-1"} + + tokenIter := tokenizer.Tokens(metadata) + got, err := v2.Collect(tokenIter) + require.NoError(t, err) + require.Equal(t, expected, got) +} diff --git a/pkg/storage/bloom/v1/versioned_builder.go b/pkg/storage/bloom/v1/versioned_builder.go index 1dd133e210cb..9162fe5b5d95 100644 --- a/pkg/storage/bloom/v1/versioned_builder.go +++ b/pkg/storage/bloom/v1/versioned_builder.go @@ -63,6 +63,8 @@ func NewBlockBuilderV3(opts BlockOptions, writer BlockWriter) (*V3Builder, error }, nil } +// BuildFrom is only used in tests as helper function to create blocks +// It does not take indexed fields into account. func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, error) { for itr.Next() { at := itr.At() @@ -79,8 +81,7 @@ func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro return 0, errors.Wrap(err, "iterating blooms") } - // TODO(chaudum): Use the indexed fields from bloom creation, however, - // currently we still build blooms from log lines. + // TODO(chaudum): Use the indexed fields from bloom creation. fields := NewSet[Field](1) fields.Add("__line__") diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index 8f60422cd648..6d1be5b0beb6 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -9,29 +9,65 @@ import ( ) func main() { - if len(os.Args) < 2 { + if len(os.Args) < 2 || os.Args[1] == "-h" { fmt.Println("Usage: go run main.go BLOCK_DIRECTORY") os.Exit(2) } path := os.Args[1] - fmt.Printf("Block directory: %s\n", path) + fmt.Printf("Block: %s\n", path) r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) + q := v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, 256<<20) + qIter := q.Iter() md, err := q.Metadata() if err != nil { panic(err) } - fmt.Printf("Metadata: %+v\n", md) + fmt.Printf("Checksum: 0x%x\n", md.Checksum) + fmt.Printf("Series: %+v\n", md.Series) + fmt.Printf("Options: %+v\n", md.Options) - for q.Next() { - swb := q.At() - fmt.Printf("%s (%d)\n", swb.Series.Fingerprint, swb.Series.Chunks.Len()) + fmt.Println("-----------------------------") + + count := 0 + for qIter.Next() { + swb := qIter.At() + series := swb.Series + p := 0 + for swb.Blooms.Next() { + bloom := swb.Blooms.At() + fmt.Printf( + "fp=%s page=%d chunks=%d size=%vB fill=%v count=%v\n", + series.Fingerprint, + p, + series.Chunks.Len(), + bloom.Capacity()/8, + bloom.FillRatio(), + bloom.Count(), + ) + p++ + } + count++ } + fmt.Printf("Stream count: %4d\n", count) + + // q.Reset() + + // fmt.Println("-----------------------------") + + // count = 0 + // for q.Next() { + // swb := q.At() + // series := swb.Series + // fmt.Printf("%s (%3d) %v\n", series.Fingerprint, series.Chunks.Len(), swb.Meta.Fields.Items()) + // count++ + // } + // fmt.Printf("Stream count: %4d\n", count) + if q.Err() != nil { fmt.Printf("error: %s\n", q.Err()) }