Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optimization] cache prometheus : fix "loki_cache_request_duration_seconds_bucket" ‘status_code’ label always equals "200" #4891

Merged
merged 11 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Du
}

func (s resultsCache) get(ctx context.Context, key string) ([]Extent, bool) {
found, bufs, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)})
found, bufs, _, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)})
if len(found) != 1 {
return nil, false
}
Expand Down Expand Up @@ -600,7 +600,7 @@ func (s resultsCache) put(ctx context.Context, key string, extents []Extent) {
return
}

s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf})
_ = s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf})
}

func jaegerTraceID(ctx context.Context) string {
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (
"strings"
"time"

"github.com/cortexproject/cortex/pkg/tenant"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/tenant"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
lokicache "github.com/grafana/loki/pkg/storage/chunk/cache"
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
)

// Config is the configuration for the queryrange tripperware
Expand Down Expand Up @@ -392,7 +392,7 @@ func NewMetricTripperware(
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics),
)

var c cache.Cache
var c lokicache.Cache
if cfg.CacheResults {
queryCacheMiddleware, cache, err := queryrangebase.NewResultsCacheMiddleware(
log,
Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/chunk/cache/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"flag"
"sync"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -81,7 +83,7 @@ func (c *backgroundCache) Stop() {
const keysPerBatch = 100

// Store writes keys for the cache in the background.
func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) error {
for len(keys) > 0 {
num := keysPerBatch
if num > len(keys) {
Expand All @@ -101,11 +103,12 @@ func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byt
if sp != nil {
sp.LogFields(otlog.Int("dropped", num))
}
return // queue is full; give up
return nil // queue is full; give up
}
keys = keys[num:]
bufs = bufs[num:]
}
return nil
}

func (c *backgroundCache) writeBackLoop() {
Expand All @@ -118,7 +121,11 @@ func (c *backgroundCache) writeBackLoop() {
return
}
c.queueLength.Sub(float64(len(bgWrite.keys)))
c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs)
err := c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "backgroundCache writeBackLoop Cache.Store fail", "err", err)
continue
liguozhong marked this conversation as resolved.
Show resolved Hide resolved
}

case <-c.quit:
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
// Whatsmore, we found partially successful Fetchs were often treated as failed
// when they returned an error.
type Cache interface {
Store(ctx context.Context, key []string, buf [][]byte)
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string)
Store(ctx context.Context, key []string, buf [][]byte) error
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
Stop()
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/chunk/cache/cache_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ func NewCacheGenNumMiddleware(downstreamCache Cache) Cache {
}

// Store adds cache gen number to keys before calling Store method of downstream cache.
func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) {
func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) error {
keys = addCacheGenNumToCacheKeys(ctx, keys)
c.downstreamCache.Store(ctx, keys, buf)
return c.downstreamCache.Store(ctx, keys, buf)
}

// Fetch adds cache gen number to keys before calling Fetch method of downstream cache.
// It also removes gen number before responding back with found and missing keys to make sure consumer of response gets to see same keys.
func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
keys = addCacheGenNumToCacheKeys(ctx, keys)

found, bufs, missing = c.downstreamCache.Fetch(ctx, keys)
found, bufs, missing, err = c.downstreamCache.Fetch(ctx, keys)

found = removeCacheGenNumFromKeys(ctx, found)
missing = removeCacheGenNumFromKeys(ctx, missing)
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func fillCache(t *testing.T, scfg chunk.SchemaConfig, cache cache.Cache) ([]stri
chunks = append(chunks, cleanChunk)
}

cache.Store(context.Background(), keys, bufs)
err := cache.Store(context.Background(), keys, bufs)
require.NoError(t, err)
return keys, chunks
}

Expand All @@ -82,7 +83,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch
index := rand.Intn(len(keys))
key := keys[index]

found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key})
found, bufs, missingKeys, _ := cache.Fetch(context.Background(), []string{key})
require.Len(t, found, 1)
require.Len(t, bufs, 1)
require.Len(t, missingKeys, 0)
Expand All @@ -97,7 +98,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch

func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
// test getting them all
found, bufs, missingKeys := cache.Fetch(context.Background(), keys)
found, bufs, missingKeys, _ := cache.Fetch(context.Background(), keys)
require.Len(t, found, len(keys))
require.Len(t, bufs, len(keys))
require.Len(t, missingKeys, 0)
Expand Down Expand Up @@ -149,7 +150,7 @@ func (a byExternalKey) Less(i, j int) bool {
func testCacheMiss(t *testing.T, cache cache.Cache) {
for i := 0; i < 100; i++ {
key := strconv.Itoa(rand.Int()) // arbitrary key which should fail: no chunk key is a single integer
found, bufs, missing := cache.Fetch(context.Background(), []string{key})
found, bufs, missing, _ := cache.Fetch(context.Background(), []string{key})
require.Empty(t, found)
require.Empty(t, bufs)
require.Len(t, missing, 1)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
}

// Fetch implements Cache.
func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys))
for _, key := range keys {
val, ok := c.Get(ctx, key)
Expand All @@ -196,7 +196,7 @@ func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, b
}

// Store implements Cache.
func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) {
func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) error {
c.entriesAdded.Inc()

c.lock.Lock()
Expand All @@ -205,6 +205,7 @@ func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) {
for i := range keys {
c.put(keys[i], values[i])
}
return nil
}

// Stop implements Cache.
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/chunk/cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func TestFifoCacheEviction(t *testing.T) {
keys = append(keys, key)
values = append(values, value)
}
c.Store(ctx, keys, values)
err := c.Store(ctx, keys, values)
require.NoError(t, err)
require.Len(t, c.entries, cnt)

assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1))
Expand Down Expand Up @@ -93,7 +94,8 @@ func TestFifoCacheEviction(t *testing.T) {
keys = append(keys, key)
values = append(values, value)
}
c.Store(ctx, keys, values)
err = c.Store(ctx, keys, values)
require.NoError(t, err)
require.Len(t, c.entries, cnt)

assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(2))
Expand Down Expand Up @@ -139,7 +141,8 @@ func TestFifoCacheEviction(t *testing.T) {
copy(value, vstr)
values = append(values, value)
}
c.Store(ctx, keys, values)
err = c.Store(ctx, keys, values)
require.NoError(t, err)
require.Len(t, c.entries, cnt)

for i := cnt; i < cnt+evicted; i++ {
Expand Down Expand Up @@ -189,9 +192,10 @@ func TestFifoCacheExpiry(t *testing.T) {
c := NewFifoCache(test.name, test.cfg, nil, log.NewNopLogger())
ctx := context.Background()

c.Store(ctx,
err := c.Store(ctx,
[]string{key1, key2, key4, key3, key2, key1},
[][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1})
require.NoError(t, err)

value, ok := c.Get(ctx, key1)
require.True(t, ok)
Expand Down
27 changes: 13 additions & 14 deletions pkg/storage/chunk/cache/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,34 @@ type instrumentedCache struct {
requestDuration *instr.HistogramCollector
}

func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) error {
for j := range bufs {
i.storedValueSize.Observe(float64(len(bufs[j])))
}

method := i.name + ".store"
_ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error {
return instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys", len(keys)))
i.Cache.Store(ctx, keys, bufs)
return nil
return i.Cache.Store(ctx, keys, bufs)
})
}

func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) {
func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) {
var (
found []string
bufs [][]byte
missing []string
method = i.name + ".fetch"
found []string
bufs [][]byte
missing []string
fetchErr error
method = i.name + ".fetch"
)

_ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error {
err := instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys requested", len(keys)))

found, bufs, missing = i.Cache.Fetch(ctx, keys)
found, bufs, missing, fetchErr = i.Cache.Fetch(ctx, keys)
sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found)))
return nil
return fetchErr
})

i.fetchedKeys.Add(float64(len(keys)))
Expand All @@ -101,7 +100,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string,
i.fetchedValueSize.Observe(float64(len(bufs[j])))
}

return found, bufs, missing
return found, bufs, missing, err
}

func (i *instrumentedCache) Stop() {
Expand Down
30 changes: 18 additions & 12 deletions pkg/storage/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
res := &result{
batchID: input.batchID,
}
res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys)
res.found, res.bufs, res.missed, res.err = c.fetch(input.ctx, input.keys)
input.resultCh <- res
}

Expand All @@ -103,6 +103,7 @@ type result struct {
found []string
bufs [][]byte
missed []string
err error
batchID int // For ordering results.
}

Expand All @@ -121,21 +122,20 @@ func memcacheStatusCode(err error) string {
}

// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
if c.cfg.BatchSize == 0 {
found, bufs, missed = c.fetch(ctx, keys)
found, bufs, missed, err = c.fetch(ctx, keys)
return
}

start := time.Now()
found, bufs, missed = c.fetchKeysBatched(ctx, keys)
c.requestDuration.After(ctx, "Memcache.GetBatched", "200", start)
found, bufs, missed, err = c.fetchKeysBatched(ctx, keys)
c.requestDuration.After(ctx, "Memcache.GetBatched", memcacheStatusCode(err), start)
return
}

func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
var (
err error
start = time.Now()
items map[string]*memcache.Item
)
Expand All @@ -147,7 +147,7 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b
"keys requested", len(keys),
"err", err,
)
return found, bufs, keys
return found, bufs, keys, err
}

for _, key := range keys {
Expand All @@ -162,7 +162,7 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b
return
}

func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
resultsCh := make(chan *result)
batchSize := c.cfg.BatchSize

Expand Down Expand Up @@ -197,26 +197,32 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
found = append(found, result.found...)
bufs = append(bufs, result.bufs...)
missed = append(missed, result.missed...)
if result.err != nil {
err = result.err
}
}

return
}

// Store stores the key in the cache.
func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) error {
var err error
for i := range keys {
err := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error {
cacheErr := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error {
item := memcache.Item{
Key: keys[i],
Value: bufs[i],
Expiration: int32(c.cfg.Expiration.Seconds()),
}
return c.memcache.Set(&item)
})
if err != nil {
if cacheErr != nil {
level.Error(c.logger).Log("msg", "failed to put to memcached", "name", c.name, "err", err)
err = cacheErr
}
}
return err
}

// Stop does nothing.
Expand Down
Loading