Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] querier cache: WriteBackCache should be off query path #5083

Merged
merged 10 commits into from
Jan 11, 2022
Merged
7 changes: 7 additions & 0 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type Config struct {

// For tests to inject specific implementations.
Cache Cache `yaml:"-"`

// AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache.
AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"`
// AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache.
AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
Expand All @@ -50,6 +55,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f)
f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-concurrency", 16, "The maximum number of concurrent asynchronous operations can occur.")
f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-buffer-size", 500, "The maximum number of enqueued asynchronous operations allowed.")
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.")
f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).")

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk
},
}

fetcher, err := chunk.NewChunkFetcher(c, false, s, nil)
fetcher, err := chunk.NewChunkFetcher(c, false, s, nil, 10, 100)
require.NoError(t, err)
defer fetcher.Stop()

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type baseStore struct {
}

func newBaseStore(cfg StoreConfig, scfg SchemaConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) {
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks)
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks, cfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, cfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize)
if err != nil {
return baseStore{}, err
}
Expand Down
79 changes: 72 additions & 7 deletions pkg/storage/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -16,6 +19,23 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
)

var (
errAsyncBufferFull = errors.New("the async buffer is full")
reasonAsyncBufferFull = "async-buffer-full"
skipped = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total",
Help: "Total number of operations against cache that have been skipped.",
}, []string{"reason"})
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_enqueue_total",
Help: "Total number of chunk enqueue cache queue.",
})
chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_dequeue_total",
Help: "Total number of chunk dequeue cache queue.",
})
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
)

const chunkDecodeParallelism = 16

func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk {
Expand Down Expand Up @@ -89,6 +109,12 @@ type Fetcher struct {

wait sync.WaitGroup
decodeRequests chan decodeRequest

maxAsyncConcurrency int
maxAsyncBufferSize int

asyncQueue chan []Chunk
stop chan struct{}
}

type decodeRequest struct {
Expand All @@ -103,28 +129,64 @@ type decodeResponse struct {
}

// NewChunkFetcher makes a new ChunkFetcher.
func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client) (*Fetcher, error) {
func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client, maxAsyncConcurrency int, maxAsyncBufferSize int) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
cache: cacher,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
schema: schema,
storage: storage,
cache: cacher,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
maxAsyncConcurrency: maxAsyncConcurrency,
maxAsyncBufferSize: maxAsyncBufferSize,
stop: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to make this a buffered channel since you are just using it to notify goroutines to stop by closing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks .

}

c.wait.Add(chunkDecodeParallelism)
for i := 0; i < chunkDecodeParallelism; i++ {
go c.worker()
}

// Start a number of goroutines - processing async operations - equal
// to the max concurrency we have.
c.asyncQueue = make(chan []Chunk, c.maxAsyncBufferSize)
for i := 0; i < c.maxAsyncConcurrency; i++ {
go c.asyncWriteBackCacheQueueProcessLoop()
}

return c, nil
}

func (c *Fetcher) writeBackCacheAsync(fromStorage []Chunk) error {
select {
case c.asyncQueue <- fromStorage:
chunkFetcherCacheQueueEnqueue.Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunkFetcherCacheQueueEnqueue.Add(len(fromStorage))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks .

return nil
default:
return errAsyncBufferFull
}
}

func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() {
for {
select {
case fromStorage := <-c.asyncQueue:
chunkFetcherCacheQueueDequeue.Inc()
Copy link
Contributor

@cyriltovena cyriltovena Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunkFetcherCacheQueueDequeue.Add(-len(fromStorage))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done,thanks .

cacheErr := c.writeBackCache(context.Background(), fromStorage)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr)
}
case <-c.stop:
return
}
}
}

// Stop the ChunkFetcher.
func (c *Fetcher) Stop() {
close(c.decodeRequests)
c.wait.Wait()
c.cache.Stop()
close(c.stop)
}

func (c *Fetcher) worker() {
Expand Down Expand Up @@ -162,7 +224,10 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string
}

// Always cache any chunks we did get
if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil {
if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil {
if cacheErr == errAsyncBufferFull {
skipped.WithLabelValues(reasonAsyncBufferFull).Inc()
}
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from,
panic(err)
}

f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client)
f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client, 10, 100)
if err != nil {
panic(err)
}
Expand Down