diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 9e9b2025743f..394fcfc4d326 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -16,8 +17,6 @@ import ( "github.com/grafana/loki/pkg/logql/marshal" ) -var nanosecondsInMillisecond = int64(time.Millisecond / time.Nanosecond) - // NewQueryShardMiddleware creates a middleware which downstreams queries after AST mapping and query encoding. func NewQueryShardMiddleware( logger log.Logger, @@ -147,60 +146,12 @@ type shardSplitter struct { func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { cutoff := splitter.now().Add(-splitter.MinShardingLookback) - sharded, nonsharded := partitionRequest(r, cutoff) - - return splitter.parallel(ctx, sharded, nonsharded) - -} -func (splitter *shardSplitter) parallel(ctx context.Context, sharded, nonsharded queryrange.Request) (queryrange.Response, error) { - if sharded == nil { - return splitter.next.Do(ctx, nonsharded) + // Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried). + if !cutoff.After(util.TimeFromMillis(r.GetEnd())) { + return splitter.next.Do(ctx, r) } - - if nonsharded == nil { - return splitter.shardingware.Do(ctx, sharded) - } - - nonshardCh := make(chan queryrange.Response, 1) - shardCh := make(chan queryrange.Response, 1) - errCh := make(chan error, 2) - - go func() { - res, err := splitter.next.Do(ctx, nonsharded) - if err != nil { - errCh <- err - return - } - nonshardCh <- res - - }() - - go func() { - res, err := splitter.shardingware.Do(ctx, sharded) - if err != nil { - errCh <- err - return - } - shardCh <- res - }() - - resps := make([]queryrange.Response, 0, 2) - for i := 0; i < 2; i++ { - select { - case r := <-nonshardCh: - resps = append(resps, r) - case r := <-shardCh: - resps = append(resps, r) - case err := <-errCh: - return nil, err - case <-ctx.Done(): - return nil, ctx.Err() - } - - } - - return lokiCodec.MergeResponse(resps...) + return splitter.shardingware.Do(ctx, r) } // TODO(owen-d): export in cortex so we don't duplicate code @@ -212,28 +163,3 @@ func hasShards(confs queryrange.ShardingConfigs) bool { } return false } - -// partitionRequet splits a request into potentially multiple requests, one including the request's time range -// [0,t). The other will include [t,inf) -// TODO(owen-d): export in cortex so we don't duplicate code -func partitionRequest(r queryrange.Request, t time.Time) (before queryrange.Request, after queryrange.Request) { - boundary := TimeToMillis(t) - if r.GetStart() >= boundary { - return nil, r - } - - if r.GetEnd() < boundary { - return r, nil - } - - return r.WithStartEnd(r.GetStart(), boundary), r.WithStartEnd(boundary, r.GetEnd()) -} - -// TimeFromMillis is a helper to turn milliseconds -> time.Time -func TimeFromMillis(ms int64) time.Time { - return time.Unix(0, ms*nanosecondsInMillisecond) -} - -func TimeToMillis(t time.Time) int64 { - return t.UnixNano() / nanosecondsInMillisecond -} diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 79aff05b2599..6d14c0bdecb2 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/stretchr/testify/require" @@ -68,46 +69,59 @@ var ( } ) -func Test_PartitionRequest(t *testing.T) { - midpt := time.Unix(0, 0).Add(500 * time.Millisecond) - cutoff := TimeToMillis(midpt) - - // test split - req := defaultReq().WithStartEnd(0, cutoff*2) - before, after := partitionRequest(req, midpt) - require.Equal(t, req.WithStartEnd(0, cutoff), before) - require.Equal(t, req.WithStartEnd(cutoff, 2*cutoff), after) - - // test all before cutoff - before, after = partitionRequest(req, midpt.Add(1000*time.Millisecond)) - require.Equal(t, req, before) - require.Nil(t, after) - - // test after cutoff - before, after = partitionRequest(req, time.Unix(0, 0)) - require.Nil(t, before) - require.Equal(t, req, after) - -} - func Test_shardSplitter(t *testing.T) { - splitter := &shardSplitter{ - shardingware: mockHandler(lokiResps[0], nil), - next: mockHandler(lokiResps[1], nil), - now: time.Now, - MinShardingLookback: 0, - } - req := defaultReq().WithStartEnd( - TimeToMillis(time.Now().Add(-time.Hour)), - TimeToMillis(time.Now().Add(time.Hour)), + util.TimeToMillis(start), + util.TimeToMillis(end), ) - resp, err := splitter.Do(context.Background(), req) - require.Nil(t, err) - expected, err := lokiCodec.MergeResponse(lokiResps...) - require.Nil(t, err) - require.Equal(t, expected, resp) + for _, tc := range []struct { + desc string + lookback time.Duration + shouldShard bool + }{ + { + desc: "older than lookback", + lookback: -time.Minute, // a negative lookback will ensure the entire query doesn't cross the sharding boundary & can safely be sharded. + shouldShard: true, + }, + { + desc: "overlaps lookback", + lookback: end.Sub(start) / 2, // intersect the request causing it to avoid sharding + shouldShard: false, + }, + { + desc: "newer than lookback", + lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding. + shouldShard: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var didShard bool + + splitter := &shardSplitter{ + shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { + didShard = true + return mockHandler(lokiResps[0], nil).Do(ctx, req) + }), + next: mockHandler(lokiResps[1], nil), + now: func() time.Time { return end }, + MinShardingLookback: tc.lookback, + } + + resp, err := splitter.Do(context.Background(), req) + require.Nil(t, err) + + require.Equal(t, tc.shouldShard, didShard) + require.Nil(t, err) + + if tc.shouldShard { + require.Equal(t, lokiResps[0], resp) + } else { + require.Equal(t, lokiResps[1], resp) + } + }) + } } func Test_astMapper(t *testing.T) {