Skip to content

Commit

Permalink
remove async writeback introduced in #5083
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean committed Apr 4, 2024
1 parent 42ab9c2 commit fef533a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 76 deletions.
7 changes: 0 additions & 7 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ type Config struct {

// For tests to inject specific implementations.
Cache Cache `yaml:"-"`

// AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache.
AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"`
// AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache.
AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
Expand All @@ -52,8 +47,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f
cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f)
cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f)
f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.")
f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.")
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.")

cfg.Prefix = prefix
Expand Down
79 changes: 11 additions & 68 deletions pkg/storage/chunk/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fetcher

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -23,19 +22,6 @@ import (
)

var (
errAsyncBufferFull = errors.New("the async buffer is full")
skipped = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total",
Help: "Total number of operations against cache that have been skipped.",
})
chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_enqueued_total",
Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.",
})
chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_dequeued_total",
Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.",
})
cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "cache_corrupt_chunks_total",
Expand Down Expand Up @@ -69,12 +55,7 @@ type Fetcher struct {
wait sync.WaitGroup
decodeRequests chan decodeRequest

maxAsyncConcurrency int
maxAsyncBufferSize int

asyncQueue chan []chunk.Chunk
stopOnce sync.Once
stop chan struct{}
stopOnce sync.Once
}

type decodeRequest struct {
Expand All @@ -89,67 +70,31 @@ type decodeResponse struct {
}

// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
maxAsyncConcurrency: maxAsyncConcurrency,
maxAsyncBufferSize: maxAsyncBufferSize,
stop: make(chan struct{}),
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
}

c.wait.Add(chunkDecodeParallelism)
for i := 0; i < chunkDecodeParallelism; i++ {
go c.worker()
}

// Start a number of goroutines - processing async operations - equal
// to the max concurrency we have.
c.asyncQueue = make(chan []chunk.Chunk, c.maxAsyncBufferSize)
for i := 0; i < c.maxAsyncConcurrency; i++ {
go c.asyncWriteBackCacheQueueProcessLoop()
}

return c, nil
}

func (c *Fetcher) writeBackCacheAsync(fromStorage []chunk.Chunk) error {
select {
case c.asyncQueue <- fromStorage:
chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage)))
return nil
default:
return errAsyncBufferFull
}
}

func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() {
for {
select {
case fromStorage := <-c.asyncQueue:
chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage)))
cacheErr := c.WriteBackCache(context.Background(), fromStorage)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr)
}
case <-c.stop:
return
}
}
}

// Stop the ChunkFetcher.
func (c *Fetcher) Stop() {
c.stopOnce.Do(func() {
close(c.decodeRequests)
c.wait.Wait()
c.cache.Stop()
close(c.stop)
})
}

Expand Down Expand Up @@ -267,10 +212,8 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun
st.AddCacheBytesSent(stats.ChunkCache, bytes)

// Always cache any chunks we did get
if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil {
if cacheErr == errAsyncBufferFull {
skipped.Inc()
}

if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil {
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *LokiStore) init() error {
if err != nil {
return err
}
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff)
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff)
if err != nil {
return err
}
Expand Down

0 comments on commit fef533a

Please sign in to comment.