From fef533a99aa623645831d40c5cf5b1b2fe7eee81 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 3 Apr 2024 20:05:40 +0000 Subject: [PATCH] remove async writeback introduced in https://github.com/grafana/loki/pull/5083 Signed-off-by: Edward Welch --- pkg/storage/chunk/cache/cache.go | 7 --- pkg/storage/chunk/fetcher/fetcher.go | 79 ++++------------------------ pkg/storage/store.go | 2 +- 3 files changed, 12 insertions(+), 76 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 6e1565fcaa3e8..9239fe88d7f05 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -38,11 +38,6 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` - - // AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache. - AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"` - // AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache. - AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet @@ -52,8 +47,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f) - f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") - f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") cfg.Prefix = prefix diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 7801143932842..cf763b9cbedc9 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -2,7 +2,6 @@ package fetcher import ( "context" - "errors" "sync" "time" @@ -23,19 +22,6 @@ import ( ) var ( - errAsyncBufferFull = errors.New("the async buffer is full") - skipped = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", - Help: "Total number of operations against cache that have been skipped.", - }) - chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_enqueued_total", - Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", - }) - chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_dequeued_total", - Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.", - }) cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "cache_corrupt_chunks_total", @@ -69,12 +55,7 @@ type Fetcher struct { wait sync.WaitGroup decodeRequests chan decodeRequest - maxAsyncConcurrency int - maxAsyncBufferSize int - - asyncQueue chan []chunk.Chunk - stopOnce sync.Once - stop chan struct{} + stopOnce sync.Once } type decodeRequest struct { @@ -89,18 +70,15 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cache, - cachel2: cachel2, - l2CacheHandoff: l2CacheHandoff, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), - maxAsyncConcurrency: maxAsyncConcurrency, - maxAsyncBufferSize: maxAsyncBufferSize, - stop: make(chan struct{}), + schema: schema, + storage: storage, + cache: cache, + cachel2: cachel2, + l2CacheHandoff: l2CacheHandoff, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), } c.wait.Add(chunkDecodeParallelism) @@ -108,48 +86,15 @@ func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config. go c.worker() } - // Start a number of goroutines - processing async operations - equal - // to the max concurrency we have. - c.asyncQueue = make(chan []chunk.Chunk, c.maxAsyncBufferSize) - for i := 0; i < c.maxAsyncConcurrency; i++ { - go c.asyncWriteBackCacheQueueProcessLoop() - } - return c, nil } -func (c *Fetcher) writeBackCacheAsync(fromStorage []chunk.Chunk) error { - select { - case c.asyncQueue <- fromStorage: - chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage))) - return nil - default: - return errAsyncBufferFull - } -} - -func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { - for { - select { - case fromStorage := <-c.asyncQueue: - chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage))) - cacheErr := c.WriteBackCache(context.Background(), fromStorage) - if cacheErr != nil { - level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) - } - case <-c.stop: - return - } - } -} - // Stop the ChunkFetcher. func (c *Fetcher) Stop() { c.stopOnce.Do(func() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() - close(c.stop) }) } @@ -267,10 +212,8 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun st.AddCacheBytesSent(stats.ChunkCache, bytes) // Always cache any chunks we did get - if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { - if cacheErr == errAsyncBufferFull { - skipped.Inc() - } + + if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1a4fa386062f7..9e50d1531d587 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -199,7 +199,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff) if err != nil { return err }