Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] querier Index cache: cacheStore should be off query path ,fix:4862 issue like 5083 pr #5198

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/caching_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient,
indexClient = newCachingIndexClient(indexClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{
MaxSizeItems: 500,
Validity: 5 * time.Minute,
}, reg, logger), 5*time.Minute, limits, logger, false)
}, reg, logger), 5*time.Minute, limits, logger, false, 10, 100)
return indexClient, chunkClient, tableClient, schemaConfig, closer, err
}

Expand Down
63 changes: 59 additions & 4 deletions pkg/storage/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"sync"
"time"

util_log "github.com/cortexproject/cortex/pkg/util/log"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have forked the log package from Cortex which is what we are using now. Please use util_log "github.com/grafana/loki/pkg/util/log".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand All @@ -21,7 +23,8 @@ import (
)

var (
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
errAsyncBufferFull = errors.New("the async buffer is full")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the error could be a bit more descriptive:

Suggested change
errAsyncBufferFull = errors.New("the async buffer is full")
errAsyncBufferFull = errors.New("the async buffer of the caching index client is full")

cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "querier_index_cache_corruptions_total",
Help: "The number of cache corruptions for the index cache.",
Expand All @@ -46,6 +49,14 @@ var (
Name: "querier_index_cache_encode_errors_total",
Help: "The number of errors for the index cache while encoding the body.",
})
cacheClientQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_client_cache_enqueued_total",
Help: "Total number of index enqueued to a buffer to be asynchronously written back to the index cache.",
})
cacheClientQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_client_cache_dequeued_total",
Help: "Total number of index dequeued to a buffer to be asynchronously written back to the index cache.",
})
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
)

const sep = "\xff"
Expand All @@ -57,26 +68,45 @@ type cachingIndexClient struct {
limits StoreLimits
logger log.Logger
disableBroadQueries bool
asyncQueue chan cacheEntry
maxAsyncConcurrency int
maxAsyncBufferSize int
stop chan struct{}
}

func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool) chunk.IndexClient {
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool, maxAsyncConcurrency int, maxAsyncBufferSize int) chunk.IndexClient {
if c == nil || cache.IsEmptyTieredCache(c) {
return client
}

return &cachingIndexClient{
cacheClient := &cachingIndexClient{
IndexClient: client,
cache: cache.NewSnappy(c, logger),
validity: validity,
limits: limits,
logger: logger,
disableBroadQueries: disableBroadQueries,
maxAsyncConcurrency: maxAsyncConcurrency,
maxAsyncBufferSize: maxAsyncBufferSize,
stop: make(chan struct{}),
}
cacheClient.asyncQueue = make(chan cacheEntry, cacheClient.maxAsyncBufferSize)
for i := 0; i < cacheClient.maxAsyncConcurrency; i++ {
go cacheClient.asyncWriteBackCacheQueueProcessLoop()
}

return cacheClient
}

type cacheEntry struct {
keys []string
batches []ReadBatch
}

func (s *cachingIndexClient) Stop() {
s.cache.Stop()
s.IndexClient.Stop()
close(s.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After closing, should we wait for the queue to be emptied before Stop() returns?

}

func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
Expand Down Expand Up @@ -201,7 +231,7 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind
}
}

err := s.cacheStore(ctx, keys, batches)
err := s.cacheStoreAsync(keys, batches)
if cardinalityErr != nil {
return cardinalityErr
}
Expand Down Expand Up @@ -275,6 +305,31 @@ func isChunksQuery(q chunk.IndexQuery) bool {
return len(q.RangeValueStart) != 0
}

func (s *cachingIndexClient) cacheStoreAsync(keys []string, batches []ReadBatch) error {
select {
case s.asyncQueue <- cacheEntry{keys, batches}:
cacheClientQueueEnqueue.Add(float64(len(batches)))
return nil
default:
return errAsyncBufferFull
}
}

func (s *cachingIndexClient) asyncWriteBackCacheQueueProcessLoop() {
for {
select {
case cacheEntry := <-s.asyncQueue:
cacheClientQueueDequeue.Add(float64(len(cacheEntry.batches)))
cacheErr := s.cacheStore(context.Background(), cacheEntry.keys, cacheEntry.batches)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Warn(util_log.Logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)
level.Warn(s.logger).Log("msg", "could not write fetched index from storage into index cache", "err", cacheErr)

}
case <-s.stop:
return
}
}
}

func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error {
cachePuts.Add(float64(len(keys)))

Expand Down
29 changes: 22 additions & 7 deletions pkg/storage/chunk/storage/caching_index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,25 @@ func TestCachingStorageClientBasic(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100)
queries := []chunk.IndexQuery{{
TableName: "table",
HashValue: "baz",
}}
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool {
return true
})

require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of putting a sleep, I think you should check for the channel length to be 0 like:

	assert.Eventually(t, func() bool {
		return len(client.asyncQueue) == 0
	}, time.Second, 10*time.Millisecond)

It would check if the async queue got flushed every 10ms for 1s. If the queue length doesn't get to 0 in 1s then the test would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert.Eventually(t, func() bool {
return len(client.asyncQueue) == 0
}, time.Second, 10*time.Millisecond)

ok,cool,👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

assert.EqualValues(t, 1, len(store.queries))

// If we do the query to the cache again, the underlying store shouldn't see it.
err = client.QueryPages(ctx, queries, func(_ chunk.IndexQuery, _ chunk.ReadBatch) bool {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, I think we do not need this sleep, the above suggested check for write back queue to be 0 should suffice.

yes

assert.EqualValues(t, 1, len(store.queries))
}

Expand All @@ -77,7 +80,7 @@ func TestTempCachingStorageClient(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false)
client := newCachingIndexClient(store, cache, 10000*time.Millisecond, limits, logger, false, 10, 100)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo"},
{TableName: "table", HashValue: "bar"},
Expand All @@ -92,6 +95,7 @@ func TestTempCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion as above to do assert.Eventually. I think it would apply to most of the cases below.
We just want to make sure the write back queue is cleared since it is async now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks

assert.EqualValues(t, len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)

Expand All @@ -105,11 +109,12 @@ func TestTempCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion to drop this and some of the similar instances below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same suggestion to drop this and some of the similar instances below.

done,thanks, use assert.Eventually

assert.EqualValues(t, len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)

// If we do the query after validity, it should see the queries.
time.Sleep(100 * time.Millisecond)
time.Sleep(10000 * time.Millisecond)
results = 0
err = client.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
Expand All @@ -119,6 +124,7 @@ func TestTempCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
assert.EqualValues(t, 2*len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)
}
Expand All @@ -136,7 +142,7 @@ func TestPermCachingStorageClient(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false)
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false, 10, 100)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo", Immutable: true},
{TableName: "table", HashValue: "bar", Immutable: true},
Expand All @@ -151,6 +157,7 @@ func TestPermCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)

Expand All @@ -164,6 +171,7 @@ func TestPermCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)

Expand All @@ -178,6 +186,7 @@ func TestPermCachingStorageClient(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, len(queries), len(store.queries))
assert.EqualValues(t, len(queries), results)
}
Expand All @@ -188,13 +197,14 @@ func TestCachingStorageClientEmptyResponse(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100)
queries := []chunk.IndexQuery{{TableName: "table", HashValue: "foo"}}
err = client.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
assert.False(t, batch.Iterator().Next())
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, 1, len(store.queries))

// If we do the query to the cache again, the underlying store shouldn't see it.
Expand All @@ -203,6 +213,7 @@ func TestCachingStorageClientEmptyResponse(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, 1, len(store.queries))
}

Expand All @@ -227,7 +238,7 @@ func TestCachingStorageClientCollision(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false, 10, 100)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")},
{TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("baz")},
Expand All @@ -245,6 +256,7 @@ func TestCachingStorageClientCollision(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, 1, len(store.queries))
assert.EqualValues(t, store.results, results)

Expand All @@ -261,6 +273,7 @@ func TestCachingStorageClientCollision(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
assert.EqualValues(t, 1, len(store.queries))
assert.EqualValues(t, store.results, results)
}
Expand Down Expand Up @@ -408,14 +421,15 @@ func TestCachingStorageClientStoreQueries(t *testing.T) {
cache := &mockCache{
Cache: cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger),
}
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, disableBroadQueries)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, disableBroadQueries, 10, 100)
var callbackQueries []chunk.IndexQuery

err = client.QueryPages(ctx, tc.queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
callbackQueries = append(callbackQueries, query)
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

// we do a callback per query sent not per query done to the index store. See if we got as many callbacks as the number of actual queries.
sort.Slice(tc.queries, func(i, j int) bool {
Expand All @@ -442,6 +456,7 @@ func TestCachingStorageClientStoreQueries(t *testing.T) {
return true
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

// verify the callback queries again
sort.Slice(callbackQueries, func(i, j int) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewStore(
if err != nil {
return nil, errors.Wrap(err, "error creating index client")
}
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries)
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits, logger, cfg.DisableBroadIndexQueries, chunkCacheCfg.AsyncCacheWriteBackConcurrency, chunkCacheCfg.AsyncCacheWriteBackBufferSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to break it in 2 lines for better readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks


objectStoreType := s.ObjectType
if objectStoreType == "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestCardinalityLimit(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)

client = newCachingIndexClient(client, cache.NewMockCache(), time.Minute, limits, log.NewNopLogger(), false)
client = newCachingIndexClient(client, cache.NewMockCache(), time.Minute, limits, log.NewNopLogger(), false, 10, 100)
batch := client.NewWriteBatch()
for i := 0; i < 10; i++ {
batch.Add(tableName, "bar", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
Expand Down