Skip to content

Commit

Permalink
chore(blooms): additional spans for bloom read path (#12866)
Browse files Browse the repository at this point in the history
Adds some timing information to pre-existing spans to help better understand bloom read path latency responsibility
  • Loading branch information
owen-d authored and shantanualsi committed May 6, 2024
1 parent eaadf54 commit 545cec3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -480,12 +481,25 @@ func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Dura
}

func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var (
logger = spanlogger.FromContext(ctx)
start = time.Now()
cacheDur time.Duration
)
defer func() {
logger.LogKV(
"cache_duration", cacheDur,
"total_duration", time.Since(start),
)
}()

if delimiter != "" {
return nil, nil, fmt.Errorf("does not support LIST calls with delimiter: %s", delimiter)
}
c.mtx.RLock()
cached, found := c.cache[prefix]
c.mtx.RUnlock()
cacheDur = time.Since(start)
if found {
return cached.objects, cached.prefixes, nil
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

var downloadQueueCapacity = 10000
Expand Down Expand Up @@ -119,6 +120,8 @@ func (f *Fetcher) Close() {

// FetchMetas implements fetcher
func (f *Fetcher) FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) {
logger := spanlogger.FromContextWithFallback(ctx, f.logger)

if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "fetch Metas")
}
Expand All @@ -127,9 +130,12 @@ func (f *Fetcher) FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error
for _, ref := range refs {
keys = append(keys, f.client.Meta(ref).Addr())
}

cacheStart := time.Now()
cacheHits, cacheBufs, _, err := f.metasCache.Fetch(ctx, keys)
cacheDur := time.Since(cacheStart)
if err != nil {
level.Error(f.logger).Log("msg", "failed to fetch metas from cache", "err", err)
level.Error(logger).Log("msg", "failed to fetch metas from cache", "err", err)
return nil, nil
}

Expand All @@ -138,16 +144,31 @@ func (f *Fetcher) FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error
return nil, err
}

storageStart := time.Now()
fromStorage, err := f.client.GetMetas(ctx, missing)
storageDur := time.Since(storageStart)
if err != nil {
return nil, err
}

writeBackStart := time.Now()
err = f.writeBackMetas(ctx, fromStorage)
writeBackDur := time.Since(writeBackStart)
if err != nil {
return nil, err
}

logger.LogKV(
"phase", "fetch_metas",
"err", err,
"keys", len(keys),
"hits", len(cacheHits),
"misses", len(missing),
"cache_dur", cacheDur.String(),
"storage_dur", storageDur.String(),
"write_back_dur", writeBackDur.String(),
)

results := append(fromCache, fromStorage...)
f.metrics.metasFetched.Observe(float64(len(results)))
// TODO(chaudum): get metas size from storage
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

var (
Expand Down Expand Up @@ -114,14 +115,28 @@ func (b *bloomStoreEntry) ResolveMetas(ctx context.Context, params MetaSearchPar

// FetchMetas implements store.
func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) {
logger := spanlogger.FromContext(ctx)

resolverStart := time.Now()
metaRefs, fetchers, err := b.ResolveMetas(ctx, params)
resolverDuration := time.Since(resolverStart)
if err != nil {
return nil, err
}
if len(metaRefs) != len(fetchers) {
return nil, errors.New("metaRefs and fetchers have unequal length")
}

var metaCt int
for i := range metaRefs {
metaCt += len(metaRefs[i])
}
logger.LogKV(
"msg", "resolved metas",
"metas", metaCt,
"duration", resolverDuration,
)

var metas []Meta
for i := range fetchers {
res, err := fetchers[i].FetchMetas(ctx, metaRefs[i])
Expand Down

0 comments on commit 545cec3

Please sign in to comment.