diff --git a/pkg/storage/chunk/aws/dynamodb_storage_client.go b/pkg/storage/chunk/aws/dynamodb_storage_client.go index 7ced7315e3e8..ffce899255a6 100644 --- a/pkg/storage/chunk/aws/dynamodb_storage_client.go +++ b/pkg/storage/chunk/aws/dynamodb_storage_client.go @@ -247,11 +247,11 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write } // QueryPages implements chunk.IndexClient. -func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { +func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) } -func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { input := &dynamodb.QueryInput{ TableName: aws.String(query.TableName), KeyConditions: map[string]*dynamodb.Condition{ diff --git a/pkg/storage/chunk/cassandra/storage_client.go b/pkg/storage/chunk/cassandra/storage_client.go index ff9522f63d24..4b9bdbdb6451 100644 --- a/pkg/storage/chunk/cassandra/storage_client.go +++ b/pkg/storage/chunk/cassandra/storage_client.go @@ -345,11 +345,11 @@ func (s *StorageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch) } // QueryPages implement chunk.IndexClient. -func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { +func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return util.DoParallelQueries(ctx, s.query, queries, callback) } -func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error { +func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { if s.querySemaphore != nil { if err := s.querySemaphore.Acquire(ctx, 1); err != nil { return err diff --git a/pkg/storage/chunk/gcp/bigtable_index_client.go b/pkg/storage/chunk/gcp/bigtable_index_client.go index 6a7861d3bf2b..1b4a3215b9ed 100644 --- a/pkg/storage/chunk/gcp/bigtable_index_client.go +++ b/pkg/storage/chunk/gcp/bigtable_index_client.go @@ -215,7 +215,7 @@ func (s *storageClientColumnKey) BatchWrite(ctx context.Context, batch chunk.Wri return nil } -func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { +func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages") defer sp.Finish() @@ -323,11 +323,11 @@ func (c *columnKeyIterator) Value() []byte { return c.items[c.i].Value } -func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error { +func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) } -func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { const null = string('\xff') log, ctx := spanlogger.New(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) diff --git a/pkg/storage/chunk/grpc/index_client.go b/pkg/storage/chunk/grpc/index_client.go index 146eb4236fa0..88d4429dc432 100644 --- a/pkg/storage/chunk/grpc/index_client.go +++ b/pkg/storage/chunk/grpc/index_client.go @@ -48,11 +48,11 @@ func (s *StorageClient) BatchWrite(c context.Context, batch chunk.WriteBatch) er return nil } -func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return util.DoParallelQueries(ctx, s.query, queries, callback) } -func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error { +func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { indexQuery := &QueryIndexRequest{ TableName: query.TableName, HashValue: query.HashValue, diff --git a/pkg/storage/chunk/inmemory_storage_client.go b/pkg/storage/chunk/inmemory_storage_client.go index a72eb7e3ba30..6d7ce2a2a5f0 100644 --- a/pkg/storage/chunk/inmemory_storage_client.go +++ b/pkg/storage/chunk/inmemory_storage_client.go @@ -258,7 +258,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { } // QueryPages implements StorageClient. -func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error { +func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback QueryPagesCallback) error { m.mtx.RLock() defer m.mtx.RUnlock() diff --git a/pkg/storage/chunk/local/boltdb_index_client.go b/pkg/storage/chunk/local/boltdb_index_client.go index 78befb790504..9a45c784dbbf 100644 --- a/pkg/storage/chunk/local/boltdb_index_client.go +++ b/pkg/storage/chunk/local/boltdb_index_client.go @@ -224,11 +224,11 @@ func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch return nil } -func (b *BoltIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return chunk_util.DoParallelQueries(ctx, b.query, queries, callback) } -func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { +func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { db, err := b.GetDB(query.TableName, DBOperationRead) if err != nil { if err == ErrUnexistentBoltDB { @@ -242,7 +242,7 @@ func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, cal } func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery, - callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + callback chunk.QueryPagesCallback) error { return db.View(func(tx *bbolt.Tx) error { if len(bucketName) == 0 { return ErrEmptyIndexBucketName @@ -256,7 +256,7 @@ func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, bucketName }) } -func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error { var start []byte if len(query.RangeValuePrefix) > 0 { start = []byte(query.HashValue + separator + string(query.RangeValuePrefix)) diff --git a/pkg/storage/chunk/storage/caching_index_client.go b/pkg/storage/chunk/storage/caching_index_client.go index 6c61bed94425..63f85b2282f2 100644 --- a/pkg/storage/chunk/storage/caching_index_client.go +++ b/pkg/storage/chunk/storage/caching_index_client.go @@ -77,7 +77,7 @@ func (s *cachingIndexClient) Stop() { s.IndexClient.Stop() } -func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { if len(queries) == 0 { return nil } @@ -89,7 +89,7 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind return s.doQueries(ctx, queries, callback) } -func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback, +func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback, buildIndexQuery func(query chunk.IndexQuery) chunk.IndexQuery, buildQueryKey func(query chunk.IndexQuery) string) error { if len(queries) == 0 { return nil @@ -210,7 +210,7 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind // doBroadQueries does broad queries on the store by using just TableName and HashValue. // This is useful for chunks queries or when we need to reduce QPS on index store at the expense of higher cache requirement. // All the results from the index store are cached and the responses are filtered based on the actual queries. -func (s *cachingIndexClient) doBroadQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (s *cachingIndexClient) doBroadQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { // We cache all the entries for queries looking for Chunk IDs, so filter client side. callback = chunk_util.QueryFilter(callback) return s.queryPages(ctx, queries, callback, func(query chunk.IndexQuery) chunk.IndexQuery { @@ -221,7 +221,7 @@ func (s *cachingIndexClient) doBroadQueries(ctx context.Context, queries []chunk } // doQueries does the exact same queries as opposed to doBroadQueries doing broad queries with limited query params. -func (s *cachingIndexClient) doQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (s *cachingIndexClient) doQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return s.queryPages(ctx, queries, callback, func(query chunk.IndexQuery) chunk.IndexQuery { return query }, func(q chunk.IndexQuery) string { diff --git a/pkg/storage/chunk/storage/caching_index_client_test.go b/pkg/storage/chunk/storage/caching_index_client_test.go index d2a3497da444..897e0b9dc2f3 100644 --- a/pkg/storage/chunk/storage/caching_index_client_test.go +++ b/pkg/storage/chunk/storage/caching_index_client_test.go @@ -24,7 +24,7 @@ type mockStore struct { results ReadBatch } -func (m *mockStore) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (m *mockStore) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { for _, query := range queries { callback(query, m.results) } diff --git a/pkg/storage/chunk/storage_client.go b/pkg/storage/chunk/storage_client.go index 4468a2df890d..da540f0d55d2 100644 --- a/pkg/storage/chunk/storage_client.go +++ b/pkg/storage/chunk/storage_client.go @@ -14,6 +14,9 @@ var ( ErrStorageObjectNotFound = errors.New("object not found in storage") ) +// QueryPagesCallback from an IndexQuery. +type QueryPagesCallback func(IndexQuery, ReadBatch) bool + // IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable). type IndexClient interface { Stop() @@ -23,7 +26,7 @@ type IndexClient interface { BatchWrite(context.Context, WriteBatch) error // For the read path. - QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error + QueryPages(ctx context.Context, queries []IndexQuery, callback QueryPagesCallback) error } // Client is for storing and retrieving chunks. diff --git a/pkg/storage/chunk/util/util.go b/pkg/storage/chunk/util/util.go index 7e19393dd0c6..28c008121e6d 100644 --- a/pkg/storage/chunk/util/util.go +++ b/pkg/storage/chunk/util/util.go @@ -13,11 +13,8 @@ import ( "github.com/grafana/loki/pkg/util/math" ) -// Callback from an IndexQuery. -type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool - // DoSingleQuery is the interface for indexes that don't support batching yet. -type DoSingleQuery func(context.Context, chunk.IndexQuery, Callback) error +type DoSingleQuery func(context.Context, chunk.IndexQuery, chunk.QueryPagesCallback) error // QueryParallelism is the maximum number of subqueries run in // parallel per higher-level query @@ -27,7 +24,7 @@ var QueryParallelism = 100 // and indexes that don't yet support batching. func DoParallelQueries( ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery, - callback Callback, + callback chunk.QueryPagesCallback, ) error { if len(queries) == 1 { return doSingleQuery(ctx, queries[0], callback) @@ -109,7 +106,7 @@ func (f *filteringBatchIter) Next() bool { // QueryFilter wraps a callback to ensure the results are filtered correctly; // useful for the cache and Bigtable backend, which only ever fetches the whole // row. -func QueryFilter(callback Callback) Callback { +func QueryFilter(callback chunk.QueryPagesCallback) chunk.QueryPagesCallback { return func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { return callback(query, &filteringBatch{query, batch}) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go index f700b6a0b9fa..58d7aca80608 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go @@ -180,7 +180,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB return nil } -func (t *deleteRequestsTable) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (t *deleteRequestsTable) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { for _, query := range queries { if err := t.boltdbIndexClient.QueryDB(ctx, t.db, local.IndexBucketName, query, callback); err != nil { return err diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 36c9d1b7ff46..f7be41b6099b 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -28,7 +28,7 @@ import ( type IndexSet interface { Init() error Close() - MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error DropAllDBs() error Err() error LastUsedAt() time.Time @@ -178,7 +178,7 @@ func (t *indexSet) Close() { } // MultiQueries runs multiple queries without having to take lock multiple times for each query. -func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { userID, err := tenant.TenantID(ctx) if err != nil { return err diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index b2ab47418988..7c53ce036768 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -29,7 +29,7 @@ const ( ) type BoltDBIndexClient interface { - QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error } type StorageClient interface { @@ -151,7 +151,7 @@ func (t *Table) Close() { } // MultiQueries runs multiple queries without having to take lock multiple times for each query. -func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { userID, err := tenant.TenantID(ctx) if err != nil { return err diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index 5dca22c47dec..90dbe52fbf83 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -128,7 +128,7 @@ func (tm *TableManager) Stop() { } } -func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { queriesByTable := util.QueriesByTable(queries) for tableName, queries := range queriesByTable { err := tm.query(ctx, tableName, queries, callback) @@ -140,7 +140,7 @@ func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQue return nil } -func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { logger := util_log.WithContext(ctx, util_log.Logger) level.Debug(logger).Log("table-name", tableName) diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index c80ee948f918..3b9cc650fd05 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" - chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" util_log "github.com/grafana/loki/pkg/util/log" @@ -88,7 +87,7 @@ type mockIndexSet struct { lastUsedAt time.Time } -func (m *mockIndexSet) MultiQueries(_ context.Context, queries []chunk.IndexQuery, _ chunk_util.Callback) error { +func (m *mockIndexSet) MultiQueries(_ context.Context, queries []chunk.IndexQuery, _ chunk.QueryPagesCallback) error { m.queriesDone = append(m.queriesDone, queries...) return nil } diff --git a/pkg/storage/stores/shipper/gateway_client.go b/pkg/storage/stores/shipper/gateway_client.go index 88366d0aa77f..3a196f098a8d 100644 --- a/pkg/storage/stores/shipper/gateway_client.go +++ b/pkg/storage/stores/shipper/gateway_client.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -78,7 +77,7 @@ func (s *GatewayClient) Stop() { s.conn.Close() } -func (s *GatewayClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (s *GatewayClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { errs := make(chan error) for i := 0; i < len(queries); i += maxQueriesPerGoroutine { @@ -99,7 +98,7 @@ func (s *GatewayClient) QueryPages(ctx context.Context, queries []chunk.IndexQue return lastErr } -func (s *GatewayClient) doQueries(ctx context.Context, queries []chunk.IndexQuery, callback util.Callback) error { +func (s *GatewayClient) doQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { queryKeyQueryMap := make(map[string]chunk.IndexQuery, len(queries)) gatewayQueries := make([]*indexgatewaypb.IndexQuery, 0, len(queries)) diff --git a/pkg/storage/stores/shipper/indexgateway/gateway_test.go b/pkg/storage/stores/shipper/indexgateway/gateway_test.go index 220a4c823785..4bbc1cf85a1c 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway_test.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway_test.go @@ -74,7 +74,7 @@ type mockIndexClient struct { response *mockBatch } -func (m mockIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (m mockIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { for _, query := range queries { callback(query, m.response) } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 0b8604108e17..5df7e9482a2d 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -47,7 +47,7 @@ const ( ) type boltDBIndexClient interface { - QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error NewWriteBatch() chunk.WriteBatch WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes local.TableWrites) error Stop() @@ -224,7 +224,7 @@ func (s *Shipper) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error }) } -func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { spanLogger := spanlogger.FromContext(ctx) diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go index 8d54f0768500..dabee0039761 100644 --- a/pkg/storage/stores/shipper/testutil/testutil.go +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -46,7 +46,7 @@ func AddRecordsToBatch(batch chunk.WriteBatch, tableName string, start, numRecor } type SingleTableQuerier interface { - MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error } func TestSingleTableQuery(t *testing.T, userID string, queries []chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) { @@ -62,7 +62,7 @@ func TestSingleTableQuery(t *testing.T, userID string, queries []chunk.IndexQuer } type SingleDBQuerier interface { - QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error } func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, bucketName []byte, querier SingleDBQuerier, start, numRecords int) { @@ -78,7 +78,7 @@ func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, bucke } type MultiTableQuerier interface { - QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error + QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error } func TestMultiTableQuery(t *testing.T, userID string, queries []chunk.IndexQuery, querier MultiTableQuerier, start, numRecords int) { @@ -93,7 +93,7 @@ func TestMultiTableQuery(t *testing.T, userID string, queries []chunk.IndexQuery require.Len(t, fetchedRecords, numRecords) } -func makeTestCallback(t *testing.T, minValue, maxValue int, records map[string]string) func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { +func makeTestCallback(t *testing.T, minValue, maxValue int, records map[string]string) chunk.QueryPagesCallback { t.Helper() recordsMtx := sync.Mutex{} return func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index 38d7a8ef98d9..e250da6104b0 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -37,7 +37,7 @@ const ( ) type BoltDBIndexClient interface { - QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes local.TableWrites) error } @@ -188,7 +188,7 @@ func (lt *Table) Snapshot() error { } // MultiQueries runs multiple queries without having to take lock multiple times for each query. -func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { lt.dbSnapshotsMtx.RLock() defer lt.dbSnapshotsMtx.RUnlock() diff --git a/pkg/storage/stores/shipper/uploads/table_manager.go b/pkg/storage/stores/shipper/uploads/table_manager.go index 7f0a87a4d226..b787db9a9035 100644 --- a/pkg/storage/stores/shipper/uploads/table_manager.go +++ b/pkg/storage/stores/shipper/uploads/table_manager.go @@ -92,7 +92,7 @@ func (tm *TableManager) Stop() { tm.uploadTables(context.Background(), true) } -func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { queriesByTable := util.QueriesByTable(queries) for tableName, queries := range queriesByTable { err := tm.query(ctx, tableName, queries, callback) @@ -104,7 +104,7 @@ func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQue return nil } -func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { tm.tablesMtx.RLock() defer tm.tablesMtx.RUnlock() diff --git a/pkg/storage/stores/shipper/util/queries.go b/pkg/storage/stores/shipper/util/queries.go index 788bf2785763..d74498a2049e 100644 --- a/pkg/storage/stores/shipper/util/queries.go +++ b/pkg/storage/stores/shipper/util/queries.go @@ -5,14 +5,13 @@ import ( "sync" "github.com/grafana/loki/pkg/storage/chunk" - chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" util_math "github.com/grafana/loki/pkg/util/math" ) const maxQueriesPerGoroutine = 100 type TableQuerier interface { - MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error } // QueriesByTable groups and returns queries by tables. @@ -29,7 +28,7 @@ func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery { return queriesByTable } -func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { errs := make(chan error) id := NewIndexDeduper(callback) @@ -59,12 +58,12 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [ // IndexDeduper should always be used on table level not the whole query level because it just looks at range values which can be repeated across tables // Cortex anyways dedupes entries across tables type IndexDeduper struct { - callback chunk_util.Callback + callback chunk.QueryPagesCallback seenRangeValues map[string]map[string]struct{} mtx sync.RWMutex } -func NewIndexDeduper(callback chunk_util.Callback) *IndexDeduper { +func NewIndexDeduper(callback chunk.QueryPagesCallback) *IndexDeduper { return &IndexDeduper{ callback: callback, seenRangeValues: map[string]map[string]struct{}{}, diff --git a/pkg/storage/stores/shipper/util/queries_test.go b/pkg/storage/stores/shipper/util/queries_test.go index ed5ab80cf240..611c77b8fe70 100644 --- a/pkg/storage/stores/shipper/util/queries_test.go +++ b/pkg/storage/stores/shipper/util/queries_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk" - chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" ) type mockTableQuerier struct { @@ -17,7 +16,7 @@ type mockTableQuerier struct { queries map[string]chunk.IndexQuery } -func (m *mockTableQuerier) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { +func (m *mockTableQuerier) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { m.Lock() defer m.Unlock()