diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index 8b135602b7f07..89ad4e00a79c0 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -537,6 +537,9 @@ func ParseIndexShardsQuery(r *http.Request) (*RangeQuery, datasize.ByteSize, err return nil, 0, err } targetBytes, err := parseBytes(r, "targetBytesPerShard", true) + if targetBytes <= 0 { + return nil, 0, errors.New("targetBytesPerShard must be a positive value") + } return parsed, targetBytes, err } diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 8d104d702b8bb..ed3c9dab6b422 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -5,12 +5,15 @@ import ( "fmt" "time" + "github.com/c2h5oh/datasize" "github.com/opentracing/opentracing-go" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/stores" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/prometheus/common/model" @@ -281,3 +284,105 @@ func filterDuplicateChunks(scfg config.SchemaConfig, storeChunks [][]chunk.Chunk return filteredChunkIDs } + +func (a *AsyncStore) GetShards( + ctx context.Context, + userID string, + from, through model.Time, + targetBytesPerShard uint64, + predicate chunk.Predicate, +) (*logproto.ShardsResponse, error) { + logger := log.With( + util_log.WithContext(ctx, util_log.Logger), + "component", "asyncStore", + ) + + if !a.shouldQueryIngesters(through, model.Now()) { + return a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate) + } + + var ( + shardResp *logproto.ShardsResponse + statsResp *stats.Stats + ) + + jobs := []func() error{ + func() error { + var err error + shardResp, err = a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate) + return err + }, + // We can't dedupe shards by their contents, so we complement the + // store's response with the ingester's stats and . + func() error { + var err error + statsResp, err = a.ingesterQuerier.Stats(ctx, userID, from, through, predicate.Matchers...) + return err + }, + } + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + len(jobs), + func(ctx context.Context, i int) error { + return jobs[i]() + }, + ); err != nil { + return nil, err + } + + return mergeShardsFromIngestersAndStore(logger, shardResp, statsResp, targetBytesPerShard), nil +} + +func mergeShardsFromIngestersAndStore( + logger log.Logger, + storeResp *logproto.ShardsResponse, + statsResp *logproto.IndexStatsResponse, + targetBytesPerShard uint64, +) *logproto.ShardsResponse { + var storeBytes uint64 + for _, shard := range storeResp.Shards { + storeBytes += shard.Stats.Bytes + } + totalBytes := storeBytes + statsResp.Bytes + + defer func() { + level.Debug(logger).Log( + "msg", "resolved shards ", + "ingester_bytes", datasize.ByteSize(statsResp.Bytes).HumanReadable(), + "store_bytes", datasize.ByteSize(storeBytes).HumanReadable(), + "total_bytes", datasize.ByteSize(totalBytes).HumanReadable(), + "target_bytes", datasize.ByteSize(targetBytesPerShard).HumanReadable(), + "store_shards", len(storeResp.Shards), + ) + }() + + // edge case to avoid divide by zero later + if totalBytes == 0 { + return &logproto.ShardsResponse{ + Shards: sharding.LinearShards(0, 0), + } + } + + // If the ingesters don't have enough data to meaningfuly + // change the number of shards, use the store response. + if pct := float64(statsResp.Bytes) / float64(totalBytes); pct < 0.25 { + return storeResp + } + + shards := sharding.LinearShards(int(totalBytes/targetBytesPerShard), totalBytes) + + // increment the total chunks by the number seen from ingesters + // NB(owen-d): this isn't perfect as it mixes signals a bit by joining + // store chunks which _could_ possibly be filtered with ingester chunks which can't, + // but it's still directionally helpful + updatedStats := storeResp.Statistics + updatedStats.Index.TotalChunks += int64(statsResp.Chunks) + return &logproto.ShardsResponse{ + Shards: shards, + Statistics: updatedStats, + // explicitly nil chunkgroups when we've changed the shards+included chunkrefs from ingesters + ChunkGroups: nil, + } +} diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index a85b33ecccefd..9cf80868c861d 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -5,7 +5,10 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -15,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/util" ) @@ -29,8 +33,8 @@ func newStoreMock() *storeMock { return &storeMock{} } -func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - args := s.Called(ctx, userID, from, through, predicate, storeChunksOverride) +func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, overrides *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { + args := s.Called(ctx, userID, from, through, predicate, overrides) return args.Get(0).([][]chunk.Chunk), args.Get(1).([]*fetcher.Fetcher), args.Error(2) } @@ -360,3 +364,80 @@ func convertChunksToChunkIDs(s config.SchemaConfig, chunks []chunk.Chunk) []stri return chunkIDs } + +func TestMergeShardsFromIngestersAndStore(t *testing.T) { + mkStats := func(bytes, chks uint64) logproto.IndexStatsResponse { + return logproto.IndexStatsResponse{ + Bytes: bytes, + Chunks: chks, + } + } + + // creates n shards with bytesPerShard * n bytes and chks chunks + mkShards := func(n int, bytesPerShard uint64, chks int64) logproto.ShardsResponse { + return logproto.ShardsResponse{ + Shards: sharding.LinearShards(n, bytesPerShard*uint64(n)), + Statistics: stats.Result{ + Index: stats.Index{ + TotalChunks: chks, + }, + }, + } + } + + targetBytesPerShard := 10 + + for _, tc := range []struct { + desc string + ingester logproto.IndexStatsResponse + store logproto.ShardsResponse + exp logproto.ShardsResponse + }{ + { + desc: "zero bytes returns one full shard", + ingester: mkStats(0, 0), + store: mkShards(0, 0, 0), + exp: mkShards(1, 0, 0), + }, + { + desc: "zero ingester bytes honors store", + ingester: mkStats(0, 0), + store: mkShards(10, uint64(targetBytesPerShard), 10), + exp: mkShards(10, uint64(targetBytesPerShard), 10), + }, + { + desc: "zero store bytes honors ingester", + ingester: mkStats(uint64(targetBytesPerShard*10), 10), + store: mkShards(0, 0, 0), + exp: mkShards(10, uint64(targetBytesPerShard), 10), + }, + { + desc: "ingester bytes below threshold ignored", + ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters + store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store + exp: mkShards(10, uint64(targetBytesPerShard), 10), // use the store's resp + }, + { + desc: "ingester bytes above threshold recreate shards", + ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters + store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store + exp: mkShards(14, uint64(targetBytesPerShard), 20), // regenerate 14 shards + }, + } { + + t.Run(tc.desc, func(t *testing.T) { + got := mergeShardsFromIngestersAndStore( + log.NewNopLogger(), + &tc.store, + &tc.ingester, + uint64(targetBytesPerShard), + ) + require.Equal(t, tc.exp.Statistics, got.Statistics) + require.Equal(t, tc.exp.ChunkGroups, got.ChunkGroups) + require.Equal(t, tc.exp.Statistics.Index.TotalChunks, got.Statistics.Index.TotalChunks) + for i, shard := range tc.exp.Shards { + require.Equal(t, shard, got.Shards[i], "shard %d", i) + } + }) + } +} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go index 257c198ee2d75..219563e0e5358 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go @@ -22,21 +22,7 @@ type PowerOfTwoSharding struct { func (p PowerOfTwoSharding) ShardsFor(bytes uint64, maxBytesPerShard uint64) []logproto.Shard { factor := GuessShardFactor(bytes, maxBytesPerShard, p.MaxShards) - - if factor < 2 { - return []logproto.Shard{{ - Bounds: logproto.FPBounds{ - Min: 0, - Max: math.MaxUint64, - }, - Stats: &stats.Stats{ - Bytes: bytes, - }, - }} - } - return LinearShards(factor, bytes) - } // LinearShards is a sharding implementation that splits the data into @@ -71,14 +57,13 @@ func LinearShards(n int, bytes uint64) []logproto.Shard { Bytes: bytesPerShard, }, } - - // The last shard should have the remainder of the bytes - // and the max bound should be math.MaxUint64 - // NB(owen-d): this can only happen when maxShards is used - // and the maxShards isn't a factor of 2 - shards[len(shards)-1].Stats.Bytes += bytes % uint64(n) - shards[len(shards)-1].Bounds.Max = math.MaxUint64 } + // The last shard should have the remainder of the bytes + // and the max bound should be math.MaxUint64 + // NB(owen-d): this can only happen when maxShards is used + // and the maxShards isn't a factor of 2 + shards[len(shards)-1].Stats.Bytes += bytes % uint64(n) + shards[len(shards)-1].Bounds.Max = math.MaxUint64 return shards diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index b365334306e66..27e6702dc9889 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -480,6 +480,10 @@ func (l *Limits) Validate() error { return err } + if l.TSDBMaxBytesPerShard <= 0 { + return errors.New("querier.tsdb-max-bytes-per-shard must be greater than 0") + } + return nil } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 598a6f9033cde..2d4457c2a1191 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -345,6 +345,7 @@ func TestLimitsValidation(t *testing.T) { desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding) t.Run(desc, func(t *testing.T) { tc.limits.TSDBShardingStrategy = logql.PowerOfTwoVersion.String() // hacky but needed for test + tc.limits.TSDBMaxBytesPerShard = DefaultTSDBMaxBytesPerShard if tc.expected == nil { require.NoError(t, tc.limits.Validate()) } else {