diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 1542781f077d..a22ae8189600 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -5,6 +5,7 @@ import ( "net/http" "time" + "github.com/cortexproject/cortex/pkg/util" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" @@ -327,6 +328,20 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer } lokiReq := r.(*LokiRequest) + + // step align start and end time of the query. Start time is rounded down and end time is rounded up. + stepNs := r.GetStep() * 1e6 + startNs := lokiReq.StartTs.UnixNano() + start := time.Unix(0, startNs-startNs%stepNs) + + endNs := lokiReq.EndTs.UnixNano() + if mod := endNs % stepNs; mod != 0 { + endNs += stepNs - mod + } + end := time.Unix(0, endNs) + + lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) + // step is >= configured split interval, let us just split the query interval by step if lokiReq.Step >= interval.Milliseconds() { forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) { @@ -344,12 +359,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer return reqs, nil } - // nextIntervalBoundary always moves ahead in a multiple of steps but the time it returns would not be step aligned. - // To have step aligned intervals for better cache-ability of results, let us step align the start time which make all the split intervals step aligned. - startNs := lokiReq.StartTs.UnixNano() - start := time.Unix(0, startNs-startNs%(r.GetStep()*1e6)) - - for start := start; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { + for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { end := nextIntervalBoundary(start, r.GetStep(), interval) if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs { end = lokiReq.EndTs @@ -365,10 +375,6 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer }) } - if len(reqs) != 0 { - // change the start time to original time - reqs[0] = reqs[0].WithStartEnd(lokiReq.GetStart(), reqs[0].GetEnd()) - } return reqs, nil } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index ef65a01d12e3..f1c37dfa4383 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -376,14 +376,44 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), - EndTs: time.Unix(3*3*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + }, + interval: 3 * time.Hour, + }, + // end time already step aligned + { + input: &LokiRequest{ + StartTs: time.Unix(2*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + expected: []queryrangebase.Request{ + &LokiRequest{ + StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s + EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + &LokiRequest{ + StartTs: time.Unix((3*3600)+12, 0), + EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s + Step: 17 * seconds, + Query: `rate({app="foo"}[1m])`, + }, + &LokiRequest{ + StartTs: time.Unix(2*3*3600+7, 0), + EndTs: time.Unix(3*3*3600+2, 0), Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, }, }, interval: 3 * time.Hour, }, - // start time not aligned with step + // start & end time not aligned with step { input: &LokiRequest{ StartTs: time.Unix(2*3600, 0), @@ -393,7 +423,7 @@ func Test_splitMetricQuery(t *testing.T) { }, expected: []queryrangebase.Request{ &LokiRequest{ - StartTs: time.Unix(2*3600, 0), + StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, @@ -406,7 +436,7 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), - EndTs: time.Unix(3*3*3600, 0), + EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, Query: `rate({app="foo"}[1m])`, }, @@ -449,7 +479,7 @@ func Test_splitMetricQuery(t *testing.T) { }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), - EndTs: time.Unix(25*3600, 0), + EndTs: time.Unix(30*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, }, @@ -458,7 +488,7 @@ func Test_splitMetricQuery(t *testing.T) { }, { input: &LokiRequest{ - StartTs: time.Unix(0, 0), + StartTs: time.Unix(1*3600, 0), EndTs: time.Unix(3*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, @@ -466,7 +496,7 @@ func Test_splitMetricQuery(t *testing.T) { expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), - EndTs: time.Unix(3*3600, 0), + EndTs: time.Unix(6*3600, 0), Step: 6 * 3600 * seconds, Query: `rate({app="foo"}[1m])`, },