Skip to content

Commit

Permalink
dedupe index on all the queries for a table instead query batches
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani committed Feb 15, 2021
1 parent 76e713f commit 426b150
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 11 deletions.
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

id := shipper_util.NewIndexDeduper(callback)

for name, db := range t.dbs {
err := db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -292,9 +290,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
}

for _, query := range queries {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
lt.dbSnapshotsMtx.RLock()
defer lt.dbSnapshotsMtx.RUnlock()

id := shipper_util.NewIndexDeduper(callback)

for _, db := range lt.dbSnapshots {
err := db.boltdb.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand All @@ -199,9 +197,7 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c
}

for _, query := range queries {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
if err := lt.boltdbIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, callback); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
errs := make(chan error)

id := NewIndexDeduper(callback)

for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util_math.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, callback)
errs <- tableQuerier.MultiQueries(ctx, queries, id.Callback)
}(q)
}

Expand Down

0 comments on commit 426b150

Please sign in to comment.