-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198
[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198
Changes from 1 commit
aa6f8b0
d400a1b
c738be4
bf38105
2c49b50
b418102
8339f0b
d356701
0c27de7
640d031
2dd6c00
24d8ea7
06ff248
b0cedff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,9 +5,11 @@ import ( | |||||
"sync" | ||||||
"time" | ||||||
|
||||||
util_log "github.com/cortexproject/cortex/pkg/util/log" | ||||||
"github.com/go-kit/log" | ||||||
"github.com/go-kit/log/level" | ||||||
"github.com/gogo/protobuf/proto" | ||||||
"github.com/pkg/errors" | ||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
"github.com/prometheus/client_golang/prometheus/promauto" | ||||||
|
||||||
|
@@ -21,7 +23,8 @@ import ( | |||||
) | ||||||
|
||||||
var ( | ||||||
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
errAsyncBufferFull = errors.New("the async buffer is full") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO the error could be a bit more descriptive:
Suggested change
|
||||||
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Namespace: "loki", | ||||||
Name: "querier_index_cache_corruptions_total", | ||||||
Help: "The number of cache corruptions for the index cache.", | ||||||
|
@@ -46,6 +49,14 @@ var ( | |||||
Name: "querier_index_cache_encode_errors_total", | ||||||
Help: "The number of errors for the index cache while encoding the body.", | ||||||
}) | ||||||
cacheClientQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Name: "querier_index_client_cache_enqueued_total", | ||||||
Help: "Total number of index enqueued to a buffer to be asynchronously written back to the index cache.", | ||||||
}) | ||||||
cacheClientQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ | ||||||
Name: "querier_index_client_cache_dequeued_total", | ||||||
Help: "Total number of index dequeued to a buffer to be asynchronously written back to the index cache.", | ||||||
}) | ||||||
liguozhong marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) | ||||||
|
||||||
const sep = "\xff" | ||||||
|
@@ -57,26 +68,45 @@ type cachingIndexClient struct { | |||||
limits StoreLimits | ||||||
logger log.Logger | ||||||
disableBroadQueries bool | ||||||
asyncQueue chan cacheEntry | ||||||
maxAsyncConcurrency int | ||||||
maxAsyncBufferSize int | ||||||
stop chan struct{} | ||||||
} | ||||||
|
||||||
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool) chunk.IndexClient { | ||||||
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool, maxAsyncConcurrency int, maxAsyncBufferSize int) chunk.IndexClient { | ||||||
if c == nil || cache.IsEmptyTieredCache(c) { | ||||||
return client | ||||||
} | ||||||
|
||||||
return &cachingIndexClient{ | ||||||
cacheClient := &cachingIndexClient{ | ||||||
IndexClient: client, | ||||||
cache: cache.NewSnappy(c, logger), | ||||||
validity: validity, | ||||||
limits: limits, | ||||||
logger: logger, | ||||||
disableBroadQueries: disableBroadQueries, | ||||||
maxAsyncConcurrency: maxAsyncConcurrency, | ||||||
maxAsyncBufferSize: maxAsyncBufferSize, | ||||||
stop: make(chan struct{}), | ||||||
} | ||||||
cacheClient.asyncQueue = make(chan cacheEntry, cacheClient.maxAsyncBufferSize) | ||||||
for i := 0; i < cacheClient.maxAsyncConcurrency; i++ { | ||||||
go cacheClient.asyncWriteBackCacheQueueProcessLoop() | ||||||
} | ||||||
|
||||||
return cacheClient | ||||||
} | ||||||
|
||||||
type cacheEntry struct { | ||||||
keys []string | ||||||
batches []ReadBatch | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) Stop() { | ||||||
s.cache.Stop() | ||||||
s.IndexClient.Stop() | ||||||
close(s.stop) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After closing, should we wait for the queue to be emptied before |
||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { | ||||||
|
@@ -201,7 +231,7 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind | |||||
} | ||||||
} | ||||||
|
||||||
err := s.cacheStore(ctx, keys, batches) | ||||||
err := s.cacheStoreAsync(keys, batches) | ||||||
if cardinalityErr != nil { | ||||||
return cardinalityErr | ||||||
} | ||||||
|
@@ -275,6 +305,31 @@ func isChunksQuery(q chunk.IndexQuery) bool { | |||||
return len(q.RangeValueStart) != 0 | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) cacheStoreAsync(keys []string, batches []ReadBatch) error { | ||||||
select { | ||||||
case s.asyncQueue <- cacheEntry{keys, batches}: | ||||||
cacheClientQueueEnqueue.Add(float64(len(batches))) | ||||||
return nil | ||||||
default: | ||||||
return errAsyncBufferFull | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) asyncWriteBackCacheQueueProcessLoop() { | ||||||
for { | ||||||
select { | ||||||
case cacheEntry := <-s.asyncQueue: | ||||||
cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches))) | ||||||
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches) | ||||||
if cacheErr != nil { | ||||||
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
case <-s.stop: | ||||||
return | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error { | ||||||
cachePuts.Add(float64(len(keys))) | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,22 +45,25 @@ func TestCachingStorageClientBasic(t *testing.T) { | |
require.NoError(t, err) | ||
logger := log.NewNopLogger() | ||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100) | ||
queries := []chunk.IndexQuery{{ | ||
TableName: "table", | ||
HashValue: "baz", | ||
}} | ||
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { | ||
return true | ||
}) | ||
|
||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of putting a sleep, I think you should check for the channel length to be 0 like:
It would check if the async queue got flushed every There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
ok,cool,👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done,thanks |
||
assert.EqualValues(t, 1, len(store.queries)) | ||
|
||
// If we do the query to the cache again, the underlying store shouldn't see it. | ||
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { | ||
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes |
||
assert.EqualValues(t, 1, len(store.queries)) | ||
} | ||
|
||
|
@@ -77,7 +80,7 @@ func TestTempCachingStorageClient(t *testing.T) { | |
require.NoError(t, err) | ||
logger := log.NewNopLogger() | ||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger) | ||
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false) | ||
client := newCachingIndexClient(store, cache, 10000*time.Millisecond, limits, logger, false, 10, 100) | ||
queries := []chunk.IndexQuery{ | ||
{TableName: "table", HashValue: "foo"}, | ||
{TableName: "table", HashValue: "bar"}, | ||
|
@@ -92,6 +95,7 @@ func TestTempCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(1000 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same suggestion as above to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done,thanks |
||
assert.EqualValues(t, len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
|
||
|
@@ -105,11 +109,12 @@ func TestTempCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(1000 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same suggestion to drop this and some of the similar instances below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done,thanks, use assert.Eventually |
||
assert.EqualValues(t, len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
|
||
// If we do the query after validity, it should see the queries. | ||
time.Sleep(100 * time.Millisecond) | ||
time.Sleep(10000 * time.Millisecond) | ||
results = 0 | ||
err = client.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { | ||
iter := batch.Iterator() | ||
|
@@ -119,6 +124,7 @@ func TestTempCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(1000 * time.Millisecond) | ||
assert.EqualValues(t, 2*len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
} | ||
|
@@ -136,7 +142,7 @@ func TestPermCachingStorageClient(t *testing.T) { | |
require.NoError(t, err) | ||
logger := log.NewNopLogger() | ||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger) | ||
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false) | ||
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false, 10, 100) | ||
queries := []chunk.IndexQuery{ | ||
{TableName: "table", HashValue: "foo", Immutable: true}, | ||
{TableName: "table", HashValue: "bar", Immutable: true}, | ||
|
@@ -151,6 +157,7 @@ func TestPermCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
|
||
|
@@ -164,6 +171,7 @@ func TestPermCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
|
||
|
@@ -178,6 +186,7 @@ func TestPermCachingStorageClient(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, len(queries), len(store.queries)) | ||
assert.EqualValues(t, len(queries), results) | ||
} | ||
|
@@ -188,13 +197,14 @@ func TestCachingStorageClientEmptyResponse(t *testing.T) { | |
require.NoError(t, err) | ||
logger := log.NewNopLogger() | ||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100) | ||
queries := []chunk.IndexQuery{{TableName: "table", HashValue: "foo"}} | ||
err = client.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { | ||
assert.False(t, batch.Iterator().Next()) | ||
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, 1, len(store.queries)) | ||
|
||
// If we do the query to the cache again, the underlying store shouldn't see it. | ||
|
@@ -203,6 +213,7 @@ func TestCachingStorageClientEmptyResponse(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, 1, len(store.queries)) | ||
} | ||
|
||
|
@@ -227,7 +238,7 @@ func TestCachingStorageClientCollision(t *testing.T) { | |
require.NoError(t, err) | ||
logger := log.NewNopLogger() | ||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100) | ||
queries := []chunk.IndexQuery{ | ||
{TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")}, | ||
{TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("baz")}, | ||
|
@@ -245,6 +256,7 @@ func TestCachingStorageClientCollision(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, 1, len(store.queries)) | ||
assert.EqualValues(t, store.results, results) | ||
|
||
|
@@ -261,6 +273,7 @@ func TestCachingStorageClientCollision(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
assert.EqualValues(t, 1, len(store.queries)) | ||
assert.EqualValues(t, store.results, results) | ||
} | ||
|
@@ -408,14 +421,15 @@ func TestCachingStorageClientStoreQueries(t *testing.T) { | |
cache := &mockCache{ | ||
Cache: cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger), | ||
} | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, disableBroadQueries) | ||
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, disableBroadQueries, 10, 100) | ||
var callbackQueries []chunk.IndexQuery | ||
|
||
err = client.QueryPages(ctx, tc.queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { | ||
callbackQueries = append(callbackQueries, query) | ||
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
|
||
// we do a callback per query sent not per query done to the index store. See if we got as many callbacks as the number of actual queries. | ||
sort.Slice(tc.queries, func(i, j int) bool { | ||
|
@@ -442,6 +456,7 @@ func TestCachingStorageClientStoreQueries(t *testing.T) { | |
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) | ||
|
||
// verify the callback queries again | ||
sort.Slice(callbackQueries, func(i, j int) bool { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,7 +202,7 @@ func NewStore( | |
if err != nil { | ||
return nil, errors.Wrap(err, "error creating index client") | ||
} | ||
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries) | ||
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries, chunkCacheCfg.AsyncCacheWriteBackConcurrency, chunkCacheCfg.AsyncCacheWriteBackBufferSize) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to break it in 2 lines for better readability There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done,thanks |
||
|
||
objectStoreType := s.ObjectType | ||
if objectStoreType == "" { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have forked the
log
package from Cortex which is what we are using now. Please useutil_log "github.com/grafana/loki/pkg/util/log"
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks