Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/storage/chunk/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type dynamoDBStorageClient struct {
metrics *dynamoDBMetrics
}

func (a dynamoDBStorageClient) AsyncQueueLength() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest not adding this method which is only relevant for caching index client and has no use beyond just test.

Copy link
Contributor Author

@liguozhong liguozhong Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,fix. Move AsyncQueueLength() Interface from IndexClient Interface and use it in the test through interface conversion

		assert.Eventually(t, func() bool {
			if asyncClient, ok := client.(IndexAsyncClient); ok {
				return asyncClient.AsyncQueueLength() == 0
			}
			return true
		}, time.Second, 10*time.Millisecond)

type IndexAsyncClient interface { AsyncQueueLength() int }

return 0
}

// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ type StorageClient struct {
querySemaphore *semaphore.Weighted
}

func (s *StorageClient) AsyncQueueLength() int {
return 0
}

// NewStorageClient returns a new StorageClient.
func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*StorageClient, error) {
readSession, err := cfg.session("index-read", registerer)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type storageClientColumnKey struct {
keysFn keysFn
}

func (s *storageClientColumnKey) AsyncQueueLength() int {
return 0
}

// storageClientV1 implements chunk.storageClient for GCP.
type storageClientV1 struct {
storageClientColumnKey
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/grpc/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type StorageClient struct {
connection *grpc.ClientConn
}

func (s *StorageClient) AsyncQueueLength() int {
return 0
}

// NewStorageClient returns a new StorageClient.
func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient, error) {
grpcClient, conn, err := connectToGrpcServer(cfg.Address)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type MockStorage struct {
mode MockStorageMode
}

func (m *MockStorage) AsyncQueueLength() int {
return 0
}

type mockTable struct {
items map[string][]mockItem
write, read int64
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type BoltIndexClient struct {
wait sync.WaitGroup
}

func (b *BoltIndexClient) AsyncQueueLength() int {
return 0
}

// NewBoltDBIndexClient creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClient(cfg BoltDBConfig) (*BoltIndexClient, error) {
if err := chunk_util.EnsureDirectory(cfg.Directory); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/caching_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient,
indexClient = newCachingIndexClient(indexClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{
MaxSizeItems: 500,
Validity: 5 * time.Minute,
}, reg, logger), 5*time.Minute, limits, logger, false)
}, reg, logger), 5*time.Minute, limits, logger, false, 10, 100)
return indexClient, chunkClient, tableClient, schemaConfig, closer, err
}

Expand Down
72 changes: 68 additions & 4 deletions pkg/storage/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/tenant"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

var (
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
errAsyncBufferFull = errors.New("the async buffer is full")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the error could be a bit more descriptive:

Suggested change
errAsyncBufferFull = errors.New("the async buffer is full")
errAsyncBufferFull = errors.New("the async buffer of the caching index client is full")

cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "querier_index_cache_corruptions_total",
Help: "The number of cache corruptions for the index cache.",
Expand All @@ -44,6 +47,16 @@ var (
Name: "querier_index_cache_encode_errors_total",
Help: "The number of errors for the index cache while encoding the body.",
})
cacheClientQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "querier_index_client_cache_enqueued_total",
Help: "Total number of index enqueued to a buffer to be asynchronously written back to the index cache.",
})
cacheClientQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "querier_index_client_cache_dequeued_total",
Help: "Total number of index dequeued to a buffer to be asynchronously written back to the index cache.",
})
)

const sep = "\xff"
Expand All @@ -55,26 +68,45 @@ type cachingIndexClient struct {
limits StoreLimits
logger log.Logger
disableBroadQueries bool
asyncQueue chan cacheEntry
maxAsyncConcurrency int
maxAsyncBufferSize int
stop chan struct{}
}

func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool) chunk.IndexClient {
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool, maxAsyncConcurrency int, maxAsyncBufferSize int) chunk.IndexClient {
if c == nil || cache.IsEmptyTieredCache(c) {
return client
}

return &cachingIndexClient{
cacheClient := &cachingIndexClient{
IndexClient: client,
cache: cache.NewSnappy(c, logger),
validity: validity,
limits: limits,
logger: logger,
disableBroadQueries: disableBroadQueries,
maxAsyncConcurrency: maxAsyncConcurrency,
maxAsyncBufferSize: maxAsyncBufferSize,
stop: make(chan struct{}),
}
cacheClient.asyncQueue = make(chan cacheEntry, cacheClient.maxAsyncBufferSize)
for i := 0; i < cacheClient.maxAsyncConcurrency; i++ {
go cacheClient.asyncWriteBackCacheQueueProcessLoop()
}

return cacheClient
}

type cacheEntry struct {
keys []string
batches []ReadBatch
}

func (s *cachingIndexClient) Stop() {
s.cache.Stop()
s.IndexClient.Stop()
close(s.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After closing, should we wait for the queue to be emptied before Stop() returns?

}

func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
Expand All @@ -89,6 +121,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
return s.doQueries(ctx, queries, callback)
}

func (s *cachingIndexClient) AsyncQueueLength() int {
return len(s.asyncQueue)
}

func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback,
buildIndexQuery func(query chunk.IndexQuery) chunk.IndexQuery, buildQueryKey func(query chunk.IndexQuery) string) error {
if len(queries) == 0 {
Expand Down Expand Up @@ -199,10 +235,13 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind
}
}

err := s.cacheStore(ctx, keys, batches)
cacheErr := s.cacheStoreAsync(keys, batches)
if cardinalityErr != nil {
return cardinalityErr
}
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)

}
return err
}
}
Expand Down Expand Up @@ -273,6 +312,31 @@ func isChunksQuery(q chunk.IndexQuery) bool {
return len(q.RangeValueStart) != 0
}

func (s *cachingIndexClient) cacheStoreAsync(keys []string, batches []ReadBatch) error {
select {
case s.asyncQueue <- cacheEntry{keys, batches}:
cacheClientQueueEnqueue.Add(float64(len(batches)))
return nil
default:
return errAsyncBufferFull
}
}

func (s *cachingIndexClient) asyncWriteBackCacheQueueProcessLoop() {
for {
select {
case cacheEntry := <-s.asyncQueue:
cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches)))
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)

}
case <-s.stop:
return
}
}
}

func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error {
cachePuts.Add(float64(len(keys)))

Expand Down
Loading