Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198

Closed
wants to merge 14 commits into from

Conversation

liguozhong
Copy link
Contributor

@liguozhong liguozhong commented Jan 21, 2022

fix issue: #4862
<<[bug] querier : timeout, failed to put to redis, store.index-cache-read.redis #4862>>
[enhancement] querier Index cache: cacheStore should be off query path
like : #5083 (comment)
<<[enhancement] querier cache: WriteBackCache should be off query path #5083>>

@liguozhong liguozhong requested a review from a team as a code owner January 21, 2022 03:36
Copy link
Contributor

@sandeepsukhani sandeepsukhani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate all the PRs that you have been doing!
Just some minor nits suggested but other than that it looks good to me.
Please let me know if you face any issues with suggested changes in tests.

@@ -5,9 +5,11 @@ import (
"sync"
"time"

util_log "github.com/cortexproject/cortex/pkg/util/log"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have forked the log package from Cortex which is what we are using now. Please use util_log "github.com/grafana/loki/pkg/util/log".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

pkg/storage/chunk/storage/caching_index_client.go Outdated Show resolved Hide resolved
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of putting a sleep, I think you should check for the channel length to be 0 like:

	assert.Eventually(t, func() bool {
		return len(client.asyncQueue) == 0
	}, time.Second, 10*time.Millisecond)

It would check if the async queue got flushed every 10ms for 1s. If the queue length doesn't get to 0 in 1s then the test would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert.Eventually(t, func() bool {
return len(client.asyncQueue) == 0
}, time.Second, 10*time.Millisecond)

ok,cool,👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

assert.EqualValues(t, 1, len(store.queries))

// If we do the query to the cache again, the underlying store shouldn't see it.
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.

yes

@@ -92,6 +95,7 @@ func TestTempCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion as above to do assert.Eventually. I think it would apply to most of the cases below.
We just want to make sure the write back queue is cleared since it is async now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

@@ -105,11 +109,12 @@ func TestTempCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion to drop this and some of the similar instances below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion to drop this and some of the similar instances below.

done,thanks, use assert.Eventually

@@ -202,7 +202,7 @@ func NewStore(
if err != nil {
return nil, errors.Wrap(err, "error creating index client")
}
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries)
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries, chunkCacheCfg.AsyncCacheWriteBackConcurrency, chunkCacheCfg.AsyncCacheWriteBackBufferSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to break it in 2 lines for better readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
@pull-request-size pull-request-size bot added size/L and removed size/M labels Jan 21, 2022
@liguozhong liguozhong closed this Jan 24, 2022
@liguozhong liguozhong reopened this Jan 24, 2022
@liguozhong
Copy link
Contributor Author

done

@@ -113,6 +113,10 @@ type dynamoDBStorageClient struct {
metrics *dynamoDBMetrics
}

func (a dynamoDBStorageClient) AsyncQueueLength() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest not adding this method which is only relevant for caching index client and has no use beyond just test.

Copy link
Contributor Author

@liguozhong liguozhong Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,fix. Move AsyncQueueLength() Interface from IndexClient Interface and use it in the test through interface conversion

		assert.Eventually(t, func() bool {
			if asyncClient, ok := client.(IndexAsyncClient); ok {
				return asyncClient.AsyncQueueLength() == 0
			}
			return true
		}, time.Second, 10*time.Millisecond)

type IndexAsyncClient interface { AsyncQueueLength() int }

@liguozhong
Copy link
Contributor Author

done.

Copy link
Contributor

@sandeepsukhani sandeepsukhani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some minor suggestions but other than that it LGTM

Comment on lines 71 to 76
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this check since we anyways have the results cached

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this check since we anyways have the results cached

done,thanks.

Comment on lines 127 to 132
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks.

Comment on lines 204 to 209
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks.

Comment on lines 261 to 266
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks.

Comment on lines +331 to +336
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks.

Comment on lines 58 to 63
assert.Eventually(t, func() bool {
if asyncClient, ok := client.(IndexAsyncClient); ok {
return asyncClient.AsyncQueueLength() == 0
}
return true
}, time.Second, 10*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocking thing but since this is repeating too often, we could move this to a function awaitAsyncQueueFlush(*testing.T, *cachingIndexClient)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks. this is a great suggestion, thanks for the guidance.

@cyriltovena
Copy link
Contributor

Should we use a different set of configuration for the index or are we ok with the chunk one ?

@liguozhong
Copy link
Contributor Author

Should we use a different set of configuration for the index or are we ok with the chunk one ?

ok

@liguozhong
Copy link
Contributor Author

Should we use a different set of configuration for the index or are we ok with the chunk one ?

done.AsyncIndexCacheWriteBackConcurrency and AsyncIndexCacheWriteBackBufferSize

Copy link
Contributor

@sandeepsukhani sandeepsukhani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for taking care of all the feedback!

@liguozhong
Copy link
Contributor Author

hi,help review pr .

@liguozhong
Copy link
Contributor Author

liguozhong commented Feb 12, 2022

Our team is doing a performance stress test for logql(here:#5378). At present,
102GB of logs for 6 hours, and loki's query takes a total of 302s.

we need any PR that improves read performance.

The poor performance is due to the poor performance of our s3 and our cassandra and redis dependent middleware.
It is not a problem of loki. We need to ensure that the code that depends on the middleware is not dependent as much as possible during the reading process.

image

Copy link
Contributor

@chaudum chaudum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good improvement! Please also add documentation for the new settings and a changelog entry.
Also, please make the PR description more descriptive, instead of just linking other issues, but describe what and why the behaviour changed.

PS: Thanks for all the contributions 🚀

Comment on lines +49 to +52
// AsyncIndexCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing index fetched from the store to the index cache.
AsyncIndexCacheWriteBackConcurrency int `yaml:"async_index_cache_write_back_concurrency"`
// AsyncIndexCacheWriteBackBufferSize specifies the maximum number of fetched index to buffer for writing back to the index cache.
AsyncIndexCacheWriteBackBufferSize int `yaml:"async_index_cache_write_back_buffer_size"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new settings should also be documented in the configuration reference.

"github.com/grafana/loki/pkg/util/spanlogger"
)

var (
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
errAsyncBufferFull = errors.New("the async buffer is full")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the error could be a bit more descriptive:

Suggested change
errAsyncBufferFull = errors.New("the async buffer is full")
errAsyncBufferFull = errors.New("the async buffer of the caching index client is full")

cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches)))
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)

if cardinalityErr != nil {
return cardinalityErr
}
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)

}

func (s *cachingIndexClient) Stop() {
s.cache.Stop()
s.IndexClient.Stop()
close(s.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After closing, should we wait for the queue to be emptied before Stop() returns?

@stale
Copy link

stale bot commented Apr 16, 2022

Hi! This issue has been automatically marked as stale because it has not had any
activity in the past 30 days.

We use a stalebot among other tools to help manage the state of issues in this project.
A stalebot can be very useful in closing issues in a number of cases; the most common
is closing issues or PRs where the original reporter has not responded.

Stalebots are also emotionless and cruel and can close issues which are still very relevant.

If this issue is important to you, please add a comment to keep it open. More importantly, please add a thumbs-up to the original issue entry.

We regularly sort for closed issues which have a stale label sorted by thumbs up.

We may also:

  • Mark issues as revivable if we think it's a valid issue but isn't something we are likely
    to prioritize in the future (the issue will still remain closed).
  • Add a keepalive label to silence the stalebot if the issue is very common/popular/important.

We are doing our best to respond, organize, and prioritize all issues but it can be a challenging task,
our sincere apologies if you find yourself at the mercy of the stalebot.

@stale stale bot added the stale A stale issue or PR that will automatically be closed. label Apr 16, 2022
@stale stale bot closed this Apr 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/L stale A stale issue or PR that will automatically be closed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants