-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198
Conversation
…h ,fix:4862 issue like 5083 pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate all the PRs that you have been doing!
Just some minor nits suggested but other than that it looks good to me.
Please let me know if you face any issues with suggested changes in tests.
@@ -5,9 +5,11 @@ import ( | |||
"sync" | |||
"time" | |||
|
|||
util_log "github.com/cortexproject/cortex/pkg/util/log" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have forked the log
package from Cortex which is what we are using now. Please use util_log "github.com/grafana/loki/pkg/util/log"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of putting a sleep, I think you should check for the channel length to be 0 like:
assert.Eventually(t, func() bool {
return len(client.asyncQueue) == 0
}, time.Second, 10*time.Millisecond)
It would check if the async queue got flushed every 10ms
for 1s
. If the queue length doesn't get to 0 in 1s
then the test would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Eventually(t, func() bool {
return len(client.asyncQueue) == 0
}, time.Second, 10*time.Millisecond)
ok,cool,👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks
assert.EqualValues(t, 1, len(store.queries)) | ||
|
||
// If we do the query to the cache again, the underlying store shouldn't see it. | ||
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool { | ||
return true | ||
}) | ||
require.NoError(t, err) | ||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.
yes
@@ -92,6 +95,7 @@ func TestTempCachingStorageClient(t *testing.T) { | |||
return true | |||
}) | |||
require.NoError(t, err) | |||
time.Sleep(1000 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion as above to do assert.Eventually
. I think it would apply to most of the cases below.
We just want to make sure the write back queue is cleared since it is async now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks
@@ -105,11 +109,12 @@ func TestTempCachingStorageClient(t *testing.T) { | |||
return true | |||
}) | |||
require.NoError(t, err) | |||
time.Sleep(1000 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion to drop this and some of the similar instances below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion to drop this and some of the similar instances below.
done,thanks, use assert.Eventually
pkg/storage/chunk/storage/factory.go
Outdated
@@ -202,7 +202,7 @@ func NewStore( | |||
if err != nil { | |||
return nil, errors.Wrap(err, "error creating index client") | |||
} | |||
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries) | |||
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries, chunkCacheCfg.AsyncCacheWriteBackConcurrency, chunkCacheCfg.AsyncCacheWriteBackBufferSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to break it in 2 lines for better readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks
Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
done |
@@ -113,6 +113,10 @@ type dynamoDBStorageClient struct { | |||
metrics *dynamoDBMetrics | |||
} | |||
|
|||
func (a dynamoDBStorageClient) AsyncQueueLength() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest not adding this method which is only relevant for caching index client and has no use beyond just test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,fix. Move AsyncQueueLength() Interface from IndexClient Interface and use it in the test through interface conversion
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
type IndexAsyncClient interface { AsyncQueueLength() int }
…h ,fix:4862 issue like 5083 pr grafana#5198
done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some minor suggestions but other than that it LGTM
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this check since we anyways have the results cached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this check since we anyways have the results cached
done,thanks.
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks.
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks.
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks.
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks.
assert.Eventually(t, func() bool { | ||
if asyncClient, ok := client.(IndexAsyncClient); ok { | ||
return asyncClient.AsyncQueueLength() == 0 | ||
} | ||
return true | ||
}, time.Second, 10*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocking thing but since this is repeating too often, we could move this to a function awaitAsyncQueueFlush(*testing.T, *cachingIndexClient)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,thanks. this is a great suggestion, thanks for the guidance.
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
…h ,fix:4862 issue like 5083 pr grafana#5198
Should we use a different set of configuration for the index or are we ok with the chunk one ? |
ok |
…h ,fix:4862 issue like 5083 pr grafana#5198
done.AsyncIndexCacheWriteBackConcurrency and AsyncIndexCacheWriteBackBufferSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for taking care of all the feedback!
hi,help review pr . |
Our team is doing a performance stress test for logql(here:#5378). At present, we need any PR that improves read performance. The poor performance is due to the poor performance of our s3 and our cassandra and redis dependent middleware. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good improvement! Please also add documentation for the new settings and a changelog entry.
Also, please make the PR description more descriptive, instead of just linking other issues, but describe what and why the behaviour changed.
PS: Thanks for all the contributions 🚀
// AsyncIndexCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing index fetched from the store to the index cache. | ||
AsyncIndexCacheWriteBackConcurrency int `yaml:"async_index_cache_write_back_concurrency"` | ||
// AsyncIndexCacheWriteBackBufferSize specifies the maximum number of fetched index to buffer for writing back to the index cache. | ||
AsyncIndexCacheWriteBackBufferSize int `yaml:"async_index_cache_write_back_buffer_size"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new settings should also be documented in the configuration reference.
"github.com/grafana/loki/pkg/util/spanlogger" | ||
) | ||
|
||
var ( | ||
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{ | ||
errAsyncBufferFull = errors.New("the async buffer is full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the error could be a bit more descriptive:
errAsyncBufferFull = errors.New("the async buffer is full") | |
errAsyncBufferFull = errors.New("the async buffer of the caching index client is full") |
cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches))) | ||
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches) | ||
if cacheErr != nil { | ||
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) | |
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) |
if cardinalityErr != nil { | ||
return cardinalityErr | ||
} | ||
if cacheErr != nil { | ||
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) | |
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr) |
} | ||
|
||
func (s *cachingIndexClient) Stop() { | ||
s.cache.Stop() | ||
s.IndexClient.Stop() | ||
close(s.stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After closing, should we wait for the queue to be emptied before Stop()
returns?
Hi! This issue has been automatically marked as stale because it has not had any We use a stalebot among other tools to help manage the state of issues in this project. Stalebots are also emotionless and cruel and can close issues which are still very relevant. If this issue is important to you, please add a comment to keep it open. More importantly, please add a thumbs-up to the original issue entry. We regularly sort for closed issues which have a We may also:
We are doing our best to respond, organize, and prioritize all issues but it can be a challenging task, |
fix issue: #4862
<<[bug] querier : timeout, failed to put to redis, store.index-cache-read.redis #4862>>
[enhancement] querier Index cache: cacheStore should be off query path
like : #5083 (comment)
<<[enhancement] querier cache: WriteBackCache should be off query path #5083>>