-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198
[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198
Changes from 8 commits
aa6f8b0
d400a1b
c738be4
bf38105
2c49b50
b418102
8339f0b
d356701
0c27de7
640d031
2dd6c00
24d8ea7
06ff248
b0cedff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,18 +8,21 @@ import ( | |||||
"github.com/go-kit/log" | ||||||
"github.com/go-kit/log/level" | ||||||
"github.com/gogo/protobuf/proto" | ||||||
"github.com/pkg/errors" | ||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
"github.com/prometheus/client_golang/prometheus/promauto" | ||||||
|
||||||
"github.com/grafana/loki/pkg/storage/chunk" | ||||||
"github.com/grafana/loki/pkg/storage/chunk/cache" | ||||||
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" | ||||||
"github.com/grafana/loki/pkg/tenant" | ||||||
util_log "github.com/grafana/loki/pkg/util/log" | ||||||
"github.com/grafana/loki/pkg/util/spanlogger" | ||||||
) | ||||||
|
||||||
var ( | ||||||
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
errAsyncBufferFull = errors.New("the async buffer is full") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO the error could be a bit more descriptive:
Suggested change
|
||||||
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "loki", | ||||||
Name: "querier_index_cache_corruptions_total", | ||||||
Help: "The number of cache corruptions for the index cache.", | ||||||
|
@@ -44,6 +47,16 @@ var ( | |||||
Name: "querier_index_cache_encode_errors_total", | ||||||
Help: "The number of errors for the index cache while encoding the body.", | ||||||
}) | ||||||
cacheClientQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "loki", | ||||||
Name: "querier_index_client_cache_enqueued_total", | ||||||
Help: "Total number of index enqueued to a buffer to be asynchronously written back to the index cache.", | ||||||
}) | ||||||
cacheClientQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "loki", | ||||||
Name: "querier_index_client_cache_dequeued_total", | ||||||
Help: "Total number of index dequeued to a buffer to be asynchronously written back to the index cache.", | ||||||
}) | ||||||
) | ||||||
|
||||||
const sep = "\xff" | ||||||
|
@@ -55,26 +68,45 @@ type cachingIndexClient struct { | |||||
limits StoreLimits | ||||||
logger log.Logger | ||||||
disableBroadQueries bool | ||||||
asyncQueue chan cacheEntry | ||||||
maxAsyncConcurrency int | ||||||
maxAsyncBufferSize int | ||||||
stop chan struct{} | ||||||
} | ||||||
|
||||||
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool) chunk.IndexClient { | ||||||
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool, maxAsyncConcurrency int, maxAsyncBufferSize int) chunk.IndexClient { | ||||||
if c == nil || cache.IsEmptyTieredCache(c) { | ||||||
return client | ||||||
} | ||||||
|
||||||
return &cachingIndexClient{ | ||||||
cacheClient := &cachingIndexClient{ | ||||||
IndexClient: client, | ||||||
cache: cache.NewSnappy(c, logger), | ||||||
validity: validity, | ||||||
limits: limits, | ||||||
logger: logger, | ||||||
disableBroadQueries: disableBroadQueries, | ||||||
maxAsyncConcurrency: maxAsyncConcurrency, | ||||||
maxAsyncBufferSize: maxAsyncBufferSize, | ||||||
stop: make(chan struct{}), | ||||||
} | ||||||
cacheClient.asyncQueue = make(chan cacheEntry, cacheClient.maxAsyncBufferSize) | ||||||
for i := 0; i < cacheClient.maxAsyncConcurrency; i++ { | ||||||
go cacheClient.asyncWriteBackCacheQueueProcessLoop() | ||||||
} | ||||||
|
||||||
return cacheClient | ||||||
} | ||||||
|
||||||
type cacheEntry struct { | ||||||
keys []string | ||||||
batches []ReadBatch | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) Stop() { | ||||||
s.cache.Stop() | ||||||
s.IndexClient.Stop() | ||||||
close(s.stop) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After closing, should we wait for the queue to be emptied before |
||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { | ||||||
|
@@ -89,6 +121,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind | |||||
return s.doQueries(ctx, queries, callback) | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) AsyncQueueLength() int { | ||||||
return len(s.asyncQueue) | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback, | ||||||
buildIndexQuery func(query chunk.IndexQuery) chunk.IndexQuery, buildQueryKey func(query chunk.IndexQuery) string) error { | ||||||
if len(queries) == 0 { | ||||||
|
@@ -199,10 +235,13 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind | |||||
} | ||||||
} | ||||||
|
||||||
err := s.cacheStore(ctx, keys, batches) | ||||||
cacheErr := s.cacheStoreAsync(keys, batches) | ||||||
if cardinalityErr != nil { | ||||||
return cardinalityErr | ||||||
} | ||||||
if cacheErr != nil { | ||||||
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
return err | ||||||
} | ||||||
} | ||||||
|
@@ -273,6 +312,31 @@ func isChunksQuery(q chunk.IndexQuery) bool { | |||||
return len(q.RangeValueStart) != 0 | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) cacheStoreAsync(keys []string, batches []ReadBatch) error { | ||||||
select { | ||||||
case s.asyncQueue <- cacheEntry{keys, batches}: | ||||||
cacheClientQueueEnqueue.Add(float64(len(batches))) | ||||||
return nil | ||||||
default: | ||||||
return errAsyncBufferFull | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) asyncWriteBackCacheQueueProcessLoop() { | ||||||
for { | ||||||
select { | ||||||
case cacheEntry := <-s.asyncQueue: | ||||||
cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches))) | ||||||
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches) | ||||||
if cacheErr != nil { | ||||||
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
case <-s.stop: | ||||||
return | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error { | ||||||
cachePuts.Add(float64(len(keys))) | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest not adding this method which is only relevant for caching index client and has no use beyond just test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,fix. Move AsyncQueueLength() Interface from IndexClient Interface and use it in the test through interface conversion
type IndexAsyncClient interface { AsyncQueueLength() int }