Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make query sharding deterministic #707

Merged
merged 5 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* [CHANGE] Alertmanager: removed `-alertmanager.storage.*` configuration options, with the exception of the CLI flags `-alertmanager.storage.path` and `-alertmanager.storage.retention`. Use `-alertmanager-storage.*` instead. #632
* [CHANGE] Ingester: active series metrics `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` are now removed when their value is zero. #672 #690
* [FEATURE] Query Frontend: Add `cortex_query_fetched_chunks_total` per-user counter to expose the number of chunks fetched as part of queries. This metric can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #31
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660 #707
* New config options:
* `-frontend.query-sharding-total-shards`: The amount of shards to use when doing parallelisation via query sharding.
* `-frontend.query-sharding-max-sharded-queries`: The max number of sharded queries that can be run for a given received query. 0 to disable limit.
Expand Down
123 changes: 101 additions & 22 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) {
}
}
}

func TestQueryShardingCorrectness(t *testing.T) {
var (
numSeries = 1000
Expand Down Expand Up @@ -477,11 +478,7 @@ func TestQueryShardingCorrectness(t *testing.T) {
}

// Create a queryable on the fixtures.
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: series,
}, nil
})
queryable := storageSeriesQueryable(series)

for testName, testData := range tests {
// Change scope to ensure it work fine when test cases are executed concurrently.
Expand Down Expand Up @@ -600,6 +597,83 @@ func (b byLabels) Less(i, j int) bool {
) < 0
}

func TestQueryshardingDeterminism(t *testing.T) {
const shards = 16

// These are "evil" floats found in production which are the result of a rate of 1 and 3 requests per 1m5s.
// We push them as a gauge here to simplify the test scenario.
const (
evilFloatA = 0.03298
evilFloatB = 0.09894
)
require.NotEqualf(t,
evilFloatA+evilFloatA+evilFloatA,
evilFloatA+evilFloatB+evilFloatA,
"This test is based on the fact that given a=%f and b=%f, then a+a+b != a+b+a. If that is not true, this test is not testing anything.", evilFloatA, evilFloatB,
)

var (
from = time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
step = 30 * time.Second
to = from.Add(step)
)

labelsForShard := labelsForShardsGenerator(labels.FromStrings(labels.MetricName, "metric"), shards)
storageSeries := []*promql.StorageSeries{
newSeries(labelsForShard(0), from, to, step, constant(evilFloatA)),
newSeries(labelsForShard(1), from, to, step, constant(evilFloatA)),
newSeries(labelsForShard(2), from, to, step, constant(evilFloatB)),
}

shardingware := newQueryShardingMiddleware(log.NewNopLogger(), newEngine(), mockLimits{totalShards: shards}, prometheus.NewPedanticRegistry())
downstream := &downstreamHandler{engine: newEngine(), queryable: storageSeriesQueryable(storageSeries)}

req := &PrometheusInstantQueryRequest{
Path: "/query",
Time: to.UnixMilli(),
Query: `sum(metric)`,
}

var lastVal float64
for i := 0; i <= 100; i++ {
shardedRes, err := shardingware.Wrap(downstream).Do(user.InjectOrgID(context.Background(), "test"), req)
require.NoError(t, err)

shardedPrometheusRes := shardedRes.(*PrometheusResponse)

sampleStreams, err := responseToSamples(shardedPrometheusRes)
require.NoError(t, err)

require.Lenf(t, sampleStreams, 1, "There should be 1 samples stream (query %d)", i)
require.Lenf(t, sampleStreams[0].Samples, 1, "There should be 1 sample in the first stream (query %d)", i)
val := sampleStreams[0].Samples[0].Value

if i > 0 {
require.Equalf(t, lastVal, val, "Value differs on query %d", i)
}
lastVal = val
}
}

// labelsForShardsGenerator returns a function that provides labels.Labels for the shard requested
// A single generator instance generates different label sets.
func labelsForShardsGenerator(base labels.Labels, shards uint64) func(shard uint64) labels.Labels {
i := 0
return func(shard uint64) labels.Labels {
for {
i++
ls := make(labels.Labels, len(base)+1)
copy(ls, base)
ls[len(ls)-1] = labels.Label{Name: "__test_shard_adjuster__", Value: fmt.Sprintf("adjusted to be %s by %d", sharding.FormatShardIDLabelValue(shard, shards), i)}
sort.Sort(ls)
// If this label value makes this labels combination fall into the desired shard, return it, otherwise keep trying.
if ls.Hash()%shards == shard {
replay marked this conversation as resolved.
Show resolved Hide resolved
return ls
}
}
}
}

// TestQuerySharding_FunctionCorrectness is the old test that probably at some point inspired the TestQuerySharding_Correctness,
// we keep it here since it adds more test cases.
func TestQuerySharding_FunctionCorrectness(t *testing.T) {
Expand Down Expand Up @@ -700,17 +774,13 @@ func TestQuerySharding_FunctionCorrectness(t *testing.T) {
const numShards = 4
for _, query := range mkQueries(tc.tpl, tc.fn, tc.rangeQuery, tc.args) {
t.Run(query, func(t *testing.T) {
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: []*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
},
}, nil
queryable := storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
})

req := &PrometheusRangeQueryRequest{
Expand Down Expand Up @@ -1050,12 +1120,8 @@ func TestQuerySharding_ShouldReturnErrorInCorrectFormat(t *testing.T) {
queryableErr = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return nil, errors.New("fatal queryable error")
})
queryable = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: []*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)),
},
}, nil
queryable = storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)),
})
queryableSlow = newMockShardedQueryable(
2,
Expand Down Expand Up @@ -1465,6 +1531,12 @@ func (h *downstreamHandler) Do(ctx context.Context, r Request) (Response, error)
}, nil
}

func storageSeriesQueryable(series []*promql.StorageSeries) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{series: series}, nil
})
}

type querierMock struct {
series []*promql.StorageSeries
}
Expand Down Expand Up @@ -1625,6 +1697,13 @@ func stale(from, to time.Time, wrap generator) generator {
}
}

// constant returns a generator that generates a constant value
func constant(value float64) generator {
return func(ts int64) float64 {
return value
}
}

type seriesIteratorMock struct {
idx int
series []*promql.StorageSeries
Expand Down
108 changes: 58 additions & 50 deletions pkg/querier/queryrange/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,12 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ...
// handleEmbeddedQueries concurrently executes the provided queries through the downstream handler.
// The returned storage.SeriesSet contains sorted series.
func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet {
var (
jobs = concurrency.CreateJobsFromStrings(queries)
streamsMx sync.Mutex
streams []SampleStream
)
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEach(q.ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(job.(string)))
err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error {
idx := job.(int)
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
}
Expand All @@ -124,13 +121,9 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.
if err != nil {
return err
}
streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables.

q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers)

streamsMx.Lock()
streams = append(streams, resStreams...)
streamsMx.Unlock()

return nil
})

Expand All @@ -156,6 +149,14 @@ func (q *shardedQuerier) Close() error {
return nil
}

func createJobIndexes(l int) []interface{} {
Copy link
Contributor

@replay replay Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in dskit there's already concurrency.CreateJobsFromStrings which is almost exactly the same as this, but with strings. Would it maybe make sense to also add concurrency.CreateJobsFromInts there as well? (until we finally get generics :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that, but since generics are behind the corner, I preferred to keep this here until I see at least one more usage for it.

Also, IMO it would be just easier to make concurrency.ForEachJobID like:

// ForEachJobID runs the provided jobFunc for each job ID in `[0, jobs)`.
// The execution breaks on first error encountered.
func ForEachJobID(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, job int) error) error

And then just doing input[job] in the function instead of having to type-assert the interface or play with generics:

concurrency.ForEachJobID(ctx, len(input), someConcurrency, func(ctx context.Context idx int) error {
    return process(input[idx])
})

I just checked and it seems that it would fit 100% of the concurrency.ForEach usages removing all the type assertions.

If we like that I can open a PR in dskit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and it seems that it would fit 100% of the concurrency.ForEach usages removing all the type assertions.

If this is true, then I think this solution would be clearly better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets see what people think: grafana/dskit#113

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That got merged. I'll update mimir once this PR is merged.

jobs := make([]interface{}, l)
for j := 0; j < l; j++ {
jobs[j] = j
}
return jobs
}

type responseHeadersTracker struct {
headersMx sync.Mutex
headers map[string][]string
Expand Down Expand Up @@ -199,9 +200,14 @@ func (t *responseHeadersTracker) getHeaders() []*PrometheusResponseHeader {
// results.
//
// The returned storage.SeriesSet series is sorted.
func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *storage.SelectHints) storage.SeriesSet {
func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *storage.SelectHints) storage.SeriesSet {
totalLen := 0
for _, r := range results {
totalLen += len(r)
}

var (
set = make([]storage.Series, 0, len(results))
set = make([]storage.Series, 0, totalLen)
step int64
)

Expand All @@ -210,48 +216,50 @@ func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *stora
step = hints.Step
}

for _, stream := range results {
// We add an extra 10 items to account for some stale markers that could be injected.
// We're trading a lower chance of reallocation in case stale markers are added for a
// slightly higher memory utilisation.
samples := make([]model.SamplePair, 0, len(stream.Samples)+10)

for idx, sample := range stream.Samples {
// When an embedded query is executed by PromQL engine, any stale marker in the time-series
// data is used the engine to stop applying the lookback delta but the stale marker is removed
// from the query results. The result of embedded queries, which we are processing in this function,
// is then used as input to run an outer query in the PromQL engine. This data will not contain
// the stale marker (because has been removed when running the embedded query) but we still need
// the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries
// results. For this reason, here we do inject a stale marker at the beginning of each gap in the
// embedded queries results.
if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step {
for _, result := range results {
for _, stream := range result {
// We add an extra 10 items to account for some stale markers that could be injected.
// We're trading a lower chance of reallocation in case stale markers are added for a
// slightly higher memory utilisation.
samples := make([]model.SamplePair, 0, len(stream.Samples)+10)

for idx, sample := range stream.Samples {
// When an embedded query is executed by PromQL engine, any stale marker in the time-series
// data is used the engine to stop applying the lookback delta but the stale marker is removed
// from the query results. The result of embedded queries, which we are processing in this function,
// is then used as input to run an outer query in the PromQL engine. This data will not contain
// the stale marker (because has been removed when running the embedded query) but we still need
// the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries
// results. For this reason, here we do inject a stale marker at the beginning of each gap in the
// embedded queries results.
if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step {
samples = append(samples, model.SamplePair{
Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
}

samples = append(samples, model.SamplePair{
Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}

samples = append(samples, model.SamplePair{
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}
// In case the embedded query processed series which all ended before the end of the query time range,
// we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it
// simple, it's safe always to add an extra stale marker at the end of the query results.
//
// This could result in an extra sample (stale marker) after the end of the query time range, but that's
// not a problem when running the outer query because it will just be discarded.
if len(samples) > 0 && step > 0 {
samples = append(samples, model.SamplePair{
Timestamp: samples[len(samples)-1].Timestamp + model.Time(step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
}

// In case the embedded query processed series which all ended before the end of the query time range,
// we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it
// simple, it's safe always to add an extra stale marker at the end of the query results.
//
// This could result into an extra sample (stale marker) after the end of the query time range, but that's
// not a problem when running the outer query because it will just be discarded.
if len(samples) > 0 && step > 0 {
samples = append(samples, model.SamplePair{
Timestamp: samples[len(samples)-1].Timestamp + model.Time(step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples))
}

set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples))
}
return series.NewConcreteSeriesSet(set)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/sharded_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestNewSeriesSetFromEmbeddedQueriesResults(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
set := newSeriesSetFromEmbeddedQueriesResults(testData.input, testData.hints)
set := newSeriesSetFromEmbeddedQueriesResults([][]SampleStream{testData.input}, testData.hints)
actual, err := seriesSetToSampleStreams(set)
require.NoError(t, err)
assertEqualSampleStream(t, testData.expected, actual)
Expand Down
7 changes: 1 addition & 6 deletions pkg/querier/queryrange/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -558,11 +557,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
}

// Create a queryable on the fixtures.
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: series,
}, nil
})
queryable := storageSeriesQueryable(series)

// Create a downstream handler serving range queries based on the provided queryable.
downstream := &downstreamHandler{
Expand Down