From a8397161dd067698d1e4a6b1088eb2a4390083be Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 22 Feb 2022 14:55:39 -0500 Subject: [PATCH] Adds a tool for converting boltdb->tsdb and benchmarking queries (#5430) * exports NewChunkIndexIterator * starts building tsdb-bench * adds query benchmark for tsdb prototyping * lint --- .../shipper/compactor/retention/iterator.go | 2 +- .../compactor/retention/iterator_test.go | 8 +- .../shipper/compactor/retention/retention.go | 2 +- .../compactor/retention/retention_test.go | 8 +- tools/tsdb/tsdb-map/main.go | 101 ++++++++++++++++++ tools/tsdb/tsdb-map/main_test.go | 55 ++++++++++ 6 files changed, 166 insertions(+), 10 deletions(-) create mode 100644 tools/tsdb/tsdb-map/main.go create mode 100644 tools/tsdb/tsdb-map/main_test.go diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator.go b/pkg/storage/stores/shipper/compactor/retention/iterator.go index b7d03c19b4e9..5c3bf75ffdda 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator.go @@ -37,7 +37,7 @@ type chunkIndexIterator struct { labelsMapper *seriesLabelsMapper } -func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) { +func NewChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (ChunkEntryIterator, error) { labelsMapper, err := newSeriesLabelsMapper(bucket, config) if err != nil { return nil, err diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go index 7966c24187c1..f7cf59b4e252 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go @@ -36,7 +36,7 @@ func Test_ChunkIterator(t *testing.T) { require.Len(t, tables, 1) var actual []ChunkEntry err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { require.NoError(t, it.Err()) @@ -57,7 +57,7 @@ func Test_ChunkIterator(t *testing.T) { // second pass we delete c2 actual = actual[:0] err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { actual = append(actual, it.Entry()) @@ -94,7 +94,7 @@ func Test_SeriesCleaner(t *testing.T) { require.Len(t, tables, 1) // remove c1, c2 chunk err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { require.NoError(t, it.Err()) @@ -179,7 +179,7 @@ func Benchmark_ChunkIterator(b *testing.B) { _ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(local.IndexBucketName) for n := 0; n < b.N; n++ { - it, err := newChunkIndexIterator(bucket, allSchemas[0].config) + it, err := NewChunkIndexIterator(bucket, allSchemas[0].config) require.NoError(b, err) for it.Next() { chunkEntry = it.Entry() diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index b242e3b0a127..af85b35d2329 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -94,7 +94,7 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, db *bb return nil } - chunkIt, err := newChunkIndexIterator(bucket, schemaCfg) + chunkIt, err := NewChunkIndexIterator(bucket, schemaCfg) if err != nil { return fmt.Errorf("failed to create chunk index iterator: %w", err) } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 28424e0d14be..0f43bc9fb4e9 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -213,7 +213,7 @@ func Test_EmptyTable(t *testing.T) { tables := store.indexTables() require.Len(t, tables, 1) err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil) @@ -231,7 +231,7 @@ func Test_EmptyTable(t *testing.T) { bucket, err := tx.CreateBucket(local.IndexBucketName) require.NoError(t, err) - it, err := newChunkIndexIterator(bucket, schema.config) + it, err := NewChunkIndexIterator(bucket, schema.config) require.NoError(t, err) _, _, err = markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{}), nil) @@ -674,7 +674,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { for i, table := range tables { seriesCleanRecorder := newSeriesCleanRecorder() err := table.DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(local.IndexBucketName)) @@ -723,7 +723,7 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { for i, table := range tables { err := table.DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) + it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil) diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go new file mode 100644 index 000000000000..e84f58ac3190 --- /dev/null +++ b/tools/tsdb/tsdb-map/main.go @@ -0,0 +1,101 @@ +package main + +import ( + "bytes" + "context" + "flag" + "log" + "strconv" + + "go.etcd.io/bbolt" + "gopkg.in/yaml.v2" + + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/tsdb/index" +) + +var ( + source = flag.String("source", "", "the source boltdb file") + dest = flag.String("dest", "", "the dest tsdb file") + // Hardcode a periodconfig for convenience as the boltdb iterator needs one + // NB: must match the index file you're reading from + periodConfig = func() chunk.PeriodConfig { + input := ` +from: "2022-01-01" +index: + period: 24h + prefix: loki_index_ +object_store: gcs +schema: v13 +store: boltdb-shipper +` + var cfg chunk.PeriodConfig + if err := yaml.Unmarshal([]byte(input), &cfg); err != nil { + panic(err) + } + return cfg + }() +) + +func extractChecksumFromChunkID(b []byte) uint32 { + i := bytes.LastIndexByte(b, ':') + x, err := strconv.ParseUint(string(b[i+1:]), 16, 32) + if err != nil { + panic(err) + } + return uint32(x) +} + +func main() { + flag.Parse() + + if source == nil || *source == "" { + panic("source is required") + } + + if dest == nil || *dest == "" { + panic("dest is required") + } + + db, err := shipper_util.SafeOpenBoltdbFile(*source) + if err != nil { + panic(err) + } + + builder := index.NewBuilder() + + log.Println("Loading index into memory") + + // loads everything into memory. + if err := db.View(func(t *bbolt.Tx) error { + it, err := retention.NewChunkIndexIterator(t.Bucket([]byte("index")), periodConfig) + if err != nil { + return err + } + + for it.Next() { + if it.Err() != nil { + return it.Err() + } + entry := it.Entry() + builder.AddSeries(entry.Labels, []index.ChunkMeta{{ + Checksum: extractChecksumFromChunkID(entry.ChunkID), + MinTime: int64(entry.From), + MaxTime: int64(entry.Through), + Bytes: (3 << 20) / 4, // guess: 0.75mb, 1/2 of the max size + Entries: 10000, // guess: 10k entries + }}) + } + + return nil + }); err != nil { + panic(err) + } + + log.Println("writing index") + if err := builder.Build(context.Background(), *dest); err != nil { + panic(err) + } +} diff --git a/tools/tsdb/tsdb-map/main_test.go b/tools/tsdb/tsdb-map/main_test.go new file mode 100644 index 000000000000..f3c432dd164f --- /dev/null +++ b/tools/tsdb/tsdb-map/main_test.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + "math/rand" + "os" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/tsdb" + "github.com/grafana/loki/pkg/storage/tsdb/index" +) + +func TestExtractChecksum(t *testing.T) { + x := rand.Uint32() + s := fmt.Sprintf("a/b/c:d:e:%x", x) + require.Equal(t, x, extractChecksumFromChunkID([]byte(s))) +} + +// Requires LOKI_TSDB_PATH to be set or this will short-circuit +func BenchmarkQuery(b *testing.B) { + for _, bm := range []struct { + name string + matchers []*labels.Matcher + }{ + { + name: "match ns", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "namespace", "loki-ops")}, + }, + { + name: "match ns regexp", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "namespace", "loki-ops")}, + }, + } { + indexPath := os.Getenv("LOKI_TSDB_PATH") + if indexPath == "" { + return + } + + reader, err := index.NewFileReader(indexPath) + if err != nil { + panic(err) + } + b.Run(bm.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + p, _ := tsdb.PostingsForMatchers(reader, bm.matchers...) + + for p.Next() { + } + } + }) + } +}