Skip to content

Commit

Permalink
Adds a tool for converting boltdb->tsdb and benchmarking queries (#5430)
Browse files Browse the repository at this point in the history
* exports NewChunkIndexIterator

* starts building tsdb-bench

* adds query benchmark for tsdb prototyping

* lint
  • Loading branch information
owen-d authored Feb 22, 2022
1 parent 00ca1aa commit a839716
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/retention/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type chunkIndexIterator struct {
labelsMapper *seriesLabelsMapper
}

func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) {
func NewChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (ChunkEntryIterator, error) {
labelsMapper, err := newSeriesLabelsMapper(bucket, config)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Test_ChunkIterator(t *testing.T) {
require.Len(t, tables, 1)
var actual []ChunkEntry
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
require.NoError(t, it.Err())
Expand All @@ -57,7 +57,7 @@ func Test_ChunkIterator(t *testing.T) {
// second pass we delete c2
actual = actual[:0]
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
actual = append(actual, it.Entry())
Expand Down Expand Up @@ -94,7 +94,7 @@ func Test_SeriesCleaner(t *testing.T) {
require.Len(t, tables, 1)
// remove c1, c2 chunk
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
require.NoError(t, it.Err())
Expand Down Expand Up @@ -179,7 +179,7 @@ func Benchmark_ChunkIterator(b *testing.B) {
_ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(local.IndexBucketName)
for n := 0; n < b.N; n++ {
it, err := newChunkIndexIterator(bucket, allSchemas[0].config)
it, err := NewChunkIndexIterator(bucket, allSchemas[0].config)
require.NoError(b, err)
for it.Next() {
chunkEntry = it.Entry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, db *bb
return nil
}

chunkIt, err := newChunkIndexIterator(bucket, schemaCfg)
chunkIt, err := NewChunkIndexIterator(bucket, schemaCfg)
if err != nil {
return fmt.Errorf("failed to create chunk index iterator: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func Test_EmptyTable(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, 1)
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)
empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil)
Expand All @@ -231,7 +231,7 @@ func Test_EmptyTable(t *testing.T) {
bucket, err := tx.CreateBucket(local.IndexBucketName)
require.NoError(t, err)

it, err := newChunkIndexIterator(bucket, schema.config)
it, err := NewChunkIndexIterator(bucket, schema.config)
require.NoError(t, err)
_, _, err = markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{}), nil)
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)

cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(local.IndexBucketName))
Expand Down Expand Up @@ -723,7 +723,7 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) {

for i, table := range tables {
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
it, err := NewChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)
empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil)
Expand Down
101 changes: 101 additions & 0 deletions tools/tsdb/tsdb-map/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"bytes"
"context"
"flag"
"log"
"strconv"

"go.etcd.io/bbolt"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/tsdb/index"
)

var (
source = flag.String("source", "", "the source boltdb file")
dest = flag.String("dest", "", "the dest tsdb file")
// Hardcode a periodconfig for convenience as the boltdb iterator needs one
// NB: must match the index file you're reading from
periodConfig = func() chunk.PeriodConfig {
input := `
from: "2022-01-01"
index:
period: 24h
prefix: loki_index_
object_store: gcs
schema: v13
store: boltdb-shipper
`
var cfg chunk.PeriodConfig
if err := yaml.Unmarshal([]byte(input), &cfg); err != nil {
panic(err)
}
return cfg
}()
)

func extractChecksumFromChunkID(b []byte) uint32 {
i := bytes.LastIndexByte(b, ':')
x, err := strconv.ParseUint(string(b[i+1:]), 16, 32)
if err != nil {
panic(err)
}
return uint32(x)
}

func main() {
flag.Parse()

if source == nil || *source == "" {
panic("source is required")
}

if dest == nil || *dest == "" {
panic("dest is required")
}

db, err := shipper_util.SafeOpenBoltdbFile(*source)
if err != nil {
panic(err)
}

builder := index.NewBuilder()

log.Println("Loading index into memory")

// loads everything into memory.
if err := db.View(func(t *bbolt.Tx) error {
it, err := retention.NewChunkIndexIterator(t.Bucket([]byte("index")), periodConfig)
if err != nil {
return err
}

for it.Next() {
if it.Err() != nil {
return it.Err()
}
entry := it.Entry()
builder.AddSeries(entry.Labels, []index.ChunkMeta{{
Checksum: extractChecksumFromChunkID(entry.ChunkID),
MinTime: int64(entry.From),
MaxTime: int64(entry.Through),
Bytes: (3 << 20) / 4, // guess: 0.75mb, 1/2 of the max size
Entries: 10000, // guess: 10k entries
}})
}

return nil
}); err != nil {
panic(err)
}

log.Println("writing index")
if err := builder.Build(context.Background(), *dest); err != nil {
panic(err)
}
}
55 changes: 55 additions & 0 deletions tools/tsdb/tsdb-map/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"fmt"
"math/rand"
"os"
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/tsdb"
"github.com/grafana/loki/pkg/storage/tsdb/index"
)

func TestExtractChecksum(t *testing.T) {
x := rand.Uint32()
s := fmt.Sprintf("a/b/c:d:e:%x", x)
require.Equal(t, x, extractChecksumFromChunkID([]byte(s)))
}

// Requires LOKI_TSDB_PATH to be set or this will short-circuit
func BenchmarkQuery(b *testing.B) {
for _, bm := range []struct {
name string
matchers []*labels.Matcher
}{
{
name: "match ns",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "namespace", "loki-ops")},
},
{
name: "match ns regexp",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "namespace", "loki-ops")},
},
} {
indexPath := os.Getenv("LOKI_TSDB_PATH")
if indexPath == "" {
return
}

reader, err := index.NewFileReader(indexPath)
if err != nil {
panic(err)
}
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
p, _ := tsdb.PostingsForMatchers(reader, bm.matchers...)

for p.Next() {
}
}
})
}
}

0 comments on commit a839716

Please sign in to comment.