diff --git a/CHANGELOG.md b/CHANGELOG.md index a05116f7ca8..a485646f699 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,7 +74,7 @@ * [CHANGE] Alertmanager: removed `-alertmanager.storage.*` configuration options, with the exception of the CLI flags `-alertmanager.storage.path` and `-alertmanager.storage.retention`. Use `-alertmanager-storage.*` instead. #632 * [CHANGE] Ingester: active series metrics `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` are now removed when their value is zero. #672 #690 * [FEATURE] Query Frontend: Add `cortex_query_fetched_chunks_total` per-user counter to expose the number of chunks fetched as part of queries. This metric can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #31 -* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660 +* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660 #707 * New config options: * `-frontend.query-sharding-total-shards`: The amount of shards to use when doing parallelisation via query sharding. * `-frontend.query-sharding-max-sharded-queries`: The max number of sharded queries that can be run for a given received query. 0 to disable limit. diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index b5de0a26df8..9e6ac9d506a 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -97,6 +97,7 @@ func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) { } } } + func TestQueryShardingCorrectness(t *testing.T) { var ( numSeries = 1000 @@ -477,11 +478,7 @@ func TestQueryShardingCorrectness(t *testing.T) { } // Create a queryable on the fixtures. - queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &querierMock{ - series: series, - }, nil - }) + queryable := storageSeriesQueryable(series) for testName, testData := range tests { // Change scope to ensure it work fine when test cases are executed concurrently. @@ -600,6 +597,83 @@ func (b byLabels) Less(i, j int) bool { ) < 0 } +func TestQueryshardingDeterminism(t *testing.T) { + const shards = 16 + + // These are "evil" floats found in production which are the result of a rate of 1 and 3 requests per 1m5s. + // We push them as a gauge here to simplify the test scenario. + const ( + evilFloatA = 0.03298 + evilFloatB = 0.09894 + ) + require.NotEqualf(t, + evilFloatA+evilFloatA+evilFloatA, + evilFloatA+evilFloatB+evilFloatA, + "This test is based on the fact that given a=%f and b=%f, then a+a+b != a+b+a. If that is not true, this test is not testing anything.", evilFloatA, evilFloatB, + ) + + var ( + from = time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) + step = 30 * time.Second + to = from.Add(step) + ) + + labelsForShard := labelsForShardsGenerator(labels.FromStrings(labels.MetricName, "metric"), shards) + storageSeries := []*promql.StorageSeries{ + newSeries(labelsForShard(0), from, to, step, constant(evilFloatA)), + newSeries(labelsForShard(1), from, to, step, constant(evilFloatA)), + newSeries(labelsForShard(2), from, to, step, constant(evilFloatB)), + } + + shardingware := newQueryShardingMiddleware(log.NewNopLogger(), newEngine(), mockLimits{totalShards: shards}, prometheus.NewPedanticRegistry()) + downstream := &downstreamHandler{engine: newEngine(), queryable: storageSeriesQueryable(storageSeries)} + + req := &PrometheusInstantQueryRequest{ + Path: "/query", + Time: to.UnixMilli(), + Query: `sum(metric)`, + } + + var lastVal float64 + for i := 0; i <= 100; i++ { + shardedRes, err := shardingware.Wrap(downstream).Do(user.InjectOrgID(context.Background(), "test"), req) + require.NoError(t, err) + + shardedPrometheusRes := shardedRes.(*PrometheusResponse) + + sampleStreams, err := responseToSamples(shardedPrometheusRes) + require.NoError(t, err) + + require.Lenf(t, sampleStreams, 1, "There should be 1 samples stream (query %d)", i) + require.Lenf(t, sampleStreams[0].Samples, 1, "There should be 1 sample in the first stream (query %d)", i) + val := sampleStreams[0].Samples[0].Value + + if i > 0 { + require.Equalf(t, lastVal, val, "Value differs on query %d", i) + } + lastVal = val + } +} + +// labelsForShardsGenerator returns a function that provides labels.Labels for the shard requested +// A single generator instance generates different label sets. +func labelsForShardsGenerator(base labels.Labels, shards uint64) func(shard uint64) labels.Labels { + i := 0 + return func(shard uint64) labels.Labels { + for { + i++ + ls := make(labels.Labels, len(base)+1) + copy(ls, base) + ls[len(ls)-1] = labels.Label{Name: "__test_shard_adjuster__", Value: fmt.Sprintf("adjusted to be %s by %d", sharding.FormatShardIDLabelValue(shard, shards), i)} + sort.Sort(ls) + // If this label value makes this labels combination fall into the desired shard, return it, otherwise keep trying. + if ls.Hash()%shards == shard { + return ls + } + } + } +} + // TestQuerySharding_FunctionCorrectness is the old test that probably at some point inspired the TestQuerySharding_Correctness, // we keep it here since it adds more test cases. func TestQuerySharding_FunctionCorrectness(t *testing.T) { @@ -700,17 +774,13 @@ func TestQuerySharding_FunctionCorrectness(t *testing.T) { const numShards = 4 for _, query := range mkQueries(tc.tpl, tc.fn, tc.rangeQuery, tc.args) { t.Run(query, func(t *testing.T) { - queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &querierMock{ - series: []*promql.StorageSeries{ - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), - }, - }, nil + queryable := storageSeriesQueryable([]*promql.StorageSeries{ + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)), + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)), + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)), + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)), + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)), + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), }) req := &PrometheusRangeQueryRequest{ @@ -1050,12 +1120,8 @@ func TestQuerySharding_ShouldReturnErrorInCorrectFormat(t *testing.T) { queryableErr = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return nil, errors.New("fatal queryable error") }) - queryable = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &querierMock{ - series: []*promql.StorageSeries{ - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)), - }, - }, nil + queryable = storageSeriesQueryable([]*promql.StorageSeries{ + newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)), }) queryableSlow = newMockShardedQueryable( 2, @@ -1465,6 +1531,12 @@ func (h *downstreamHandler) Do(ctx context.Context, r Request) (Response, error) }, nil } +func storageSeriesQueryable(series []*promql.StorageSeries) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &querierMock{series: series}, nil + }) +} + type querierMock struct { series []*promql.StorageSeries } @@ -1625,6 +1697,13 @@ func stale(from, to time.Time, wrap generator) generator { } } +// constant returns a generator that generates a constant value +func constant(value float64) generator { + return func(ts int64) float64 { + return value + } +} + type seriesIteratorMock struct { idx int series []*promql.StorageSeries diff --git a/pkg/querier/queryrange/sharded_queryable.go b/pkg/querier/queryrange/sharded_queryable.go index 4cfa08a0dd5..6a4de44e548 100644 --- a/pkg/querier/queryrange/sharded_queryable.go +++ b/pkg/querier/queryrange/sharded_queryable.go @@ -107,15 +107,12 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ... // handleEmbeddedQueries concurrently executes the provided queries through the downstream handler. // The returned storage.SeriesSet contains sorted series. func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet { - var ( - jobs = concurrency.CreateJobsFromStrings(queries) - streamsMx sync.Mutex - streams []SampleStream - ) + streams := make([][]SampleStream, len(queries)) // Concurrently run each query. It breaks and cancels each worker context on first error. - err := concurrency.ForEach(q.ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - resp, err := q.handler.Do(ctx, q.req.WithQuery(job.(string))) + err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error { + idx := job.(int) + resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx])) if err != nil { return err } @@ -124,13 +121,9 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage. if err != nil { return err } + streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables. q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers) - - streamsMx.Lock() - streams = append(streams, resStreams...) - streamsMx.Unlock() - return nil }) @@ -156,6 +149,14 @@ func (q *shardedQuerier) Close() error { return nil } +func createJobIndexes(l int) []interface{} { + jobs := make([]interface{}, l) + for j := 0; j < l; j++ { + jobs[j] = j + } + return jobs +} + type responseHeadersTracker struct { headersMx sync.Mutex headers map[string][]string @@ -199,9 +200,14 @@ func (t *responseHeadersTracker) getHeaders() []*PrometheusResponseHeader { // results. // // The returned storage.SeriesSet series is sorted. -func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *storage.SelectHints) storage.SeriesSet { +func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *storage.SelectHints) storage.SeriesSet { + totalLen := 0 + for _, r := range results { + totalLen += len(r) + } + var ( - set = make([]storage.Series, 0, len(results)) + set = make([]storage.Series, 0, totalLen) step int64 ) @@ -210,48 +216,50 @@ func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *stora step = hints.Step } - for _, stream := range results { - // We add an extra 10 items to account for some stale markers that could be injected. - // We're trading a lower chance of reallocation in case stale markers are added for a - // slightly higher memory utilisation. - samples := make([]model.SamplePair, 0, len(stream.Samples)+10) - - for idx, sample := range stream.Samples { - // When an embedded query is executed by PromQL engine, any stale marker in the time-series - // data is used the engine to stop applying the lookback delta but the stale marker is removed - // from the query results. The result of embedded queries, which we are processing in this function, - // is then used as input to run an outer query in the PromQL engine. This data will not contain - // the stale marker (because has been removed when running the embedded query) but we still need - // the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries - // results. For this reason, here we do inject a stale marker at the beginning of each gap in the - // embedded queries results. - if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step { + for _, result := range results { + for _, stream := range result { + // We add an extra 10 items to account for some stale markers that could be injected. + // We're trading a lower chance of reallocation in case stale markers are added for a + // slightly higher memory utilisation. + samples := make([]model.SamplePair, 0, len(stream.Samples)+10) + + for idx, sample := range stream.Samples { + // When an embedded query is executed by PromQL engine, any stale marker in the time-series + // data is used the engine to stop applying the lookback delta but the stale marker is removed + // from the query results. The result of embedded queries, which we are processing in this function, + // is then used as input to run an outer query in the PromQL engine. This data will not contain + // the stale marker (because has been removed when running the embedded query) but we still need + // the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries + // results. For this reason, here we do inject a stale marker at the beginning of each gap in the + // embedded queries results. + if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step { + samples = append(samples, model.SamplePair{ + Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step), + Value: model.SampleValue(math.Float64frombits(value.StaleNaN)), + }) + } + samples = append(samples, model.SamplePair{ - Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step), - Value: model.SampleValue(math.Float64frombits(value.StaleNaN)), + Timestamp: model.Time(sample.TimestampMs), + Value: model.SampleValue(sample.Value), }) } - samples = append(samples, model.SamplePair{ - Timestamp: model.Time(sample.TimestampMs), - Value: model.SampleValue(sample.Value), - }) - } + // In case the embedded query processed series which all ended before the end of the query time range, + // we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it + // simple, it's safe always to add an extra stale marker at the end of the query results. + // + // This could result in an extra sample (stale marker) after the end of the query time range, but that's + // not a problem when running the outer query because it will just be discarded. + if len(samples) > 0 && step > 0 { + samples = append(samples, model.SamplePair{ + Timestamp: samples[len(samples)-1].Timestamp + model.Time(step), + Value: model.SampleValue(math.Float64frombits(value.StaleNaN)), + }) + } - // In case the embedded query processed series which all ended before the end of the query time range, - // we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it - // simple, it's safe always to add an extra stale marker at the end of the query results. - // - // This could result into an extra sample (stale marker) after the end of the query time range, but that's - // not a problem when running the outer query because it will just be discarded. - if len(samples) > 0 && step > 0 { - samples = append(samples, model.SamplePair{ - Timestamp: samples[len(samples)-1].Timestamp + model.Time(step), - Value: model.SampleValue(math.Float64frombits(value.StaleNaN)), - }) + set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples)) } - - set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples)) } return series.NewConcreteSeriesSet(set) } diff --git a/pkg/querier/queryrange/sharded_queryable_test.go b/pkg/querier/queryrange/sharded_queryable_test.go index a8d7db05546..90a22cc14a3 100644 --- a/pkg/querier/queryrange/sharded_queryable_test.go +++ b/pkg/querier/queryrange/sharded_queryable_test.go @@ -360,7 +360,7 @@ func TestNewSeriesSetFromEmbeddedQueriesResults(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - set := newSeriesSetFromEmbeddedQueriesResults(testData.input, testData.hints) + set := newSeriesSetFromEmbeddedQueriesResults([][]SampleStream{testData.input}, testData.hints) actual, err := seriesSetToSampleStreams(set) require.NoError(t, err) assertEqualSampleStream(t, testData.expected, actual) diff --git a/pkg/querier/queryrange/split_and_cache_test.go b/pkg/querier/queryrange/split_and_cache_test.go index 96b743ce374..5488ffb606f 100644 --- a/pkg/querier/queryrange/split_and_cache_test.go +++ b/pkg/querier/queryrange/split_and_cache_test.go @@ -25,7 +25,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" @@ -558,11 +557,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) { } // Create a queryable on the fixtures. - queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &querierMock{ - series: series, - }, nil - }) + queryable := storageSeriesQueryable(series) // Create a downstream handler serving range queries based on the provided queryable. downstream := &downstreamHandler{