Skip to content

Commit

Permalink
step align start and end time of the original query while splitting it (
Browse files Browse the repository at this point in the history
#5217)

* step align start and end time of the original query while splitting it

* also step align queries with step > split interval
  • Loading branch information
sandeepsukhani authored Jan 24, 2022
1 parent d87f2a1 commit cd1537d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 17 deletions.
26 changes: 16 additions & 10 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -327,6 +328,20 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
}

lokiReq := r.(*LokiRequest)

// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := r.GetStep() * 1e6
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%stepNs)

endNs := lokiReq.EndTs.UnixNano()
if mod := endNs % stepNs; mod != 0 {
endNs += stepNs - mod
}
end := time.Unix(0, endNs)

lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest)

// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) {
Expand All @@ -344,12 +359,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
return reqs, nil
}

// nextIntervalBoundary always moves ahead in a multiple of steps but the time it returns would not be step aligned.
// To have step aligned intervals for better cache-ability of results, let us step align the start time which make all the split intervals step aligned.
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%(r.GetStep()*1e6))

for start := start; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs {
end = lokiReq.EndTs
Expand All @@ -365,10 +375,6 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
})
}

if len(reqs) != 0 {
// change the start time to original time
reqs[0] = reqs[0].WithStartEnd(lokiReq.GetStart(), reqs[0].GetEnd())
}
return reqs, nil
}

Expand Down
44 changes: 37 additions & 7 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,44 @@ func Test_splitMetricQuery(t *testing.T) {
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},
// end time already step aligned
{
input: &LokiRequest{
StartTs: time.Unix(2*3600, 0),
EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix((3*3600)+12, 0),
EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600+2, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},
// start time not aligned with step
// start & end time not aligned with step
{
input: &LokiRequest{
StartTs: time.Unix(2*3600, 0),
Expand All @@ -393,7 +423,7 @@ func Test_splitMetricQuery(t *testing.T) {
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600, 0),
StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
Expand All @@ -406,7 +436,7 @@ func Test_splitMetricQuery(t *testing.T) {
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
Expand Down Expand Up @@ -449,7 +479,7 @@ func Test_splitMetricQuery(t *testing.T) {
},
&LokiRequest{
StartTs: time.Unix(24*3600, 0),
EndTs: time.Unix(25*3600, 0),
EndTs: time.Unix(30*3600, 0),
Step: 6 * 3600 * seconds,
Query: `rate({app="foo"}[1m])`,
},
Expand All @@ -458,15 +488,15 @@ func Test_splitMetricQuery(t *testing.T) {
},
{
input: &LokiRequest{
StartTs: time.Unix(0, 0),
StartTs: time.Unix(1*3600, 0),
EndTs: time.Unix(3*3600, 0),
Step: 6 * 3600 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(0, 0),
EndTs: time.Unix(3*3600, 0),
EndTs: time.Unix(6*3600, 0),
Step: 6 * 3600 * seconds,
Query: `rate({app="foo"}[1m])`,
},
Expand Down

0 comments on commit cd1537d

Please sign in to comment.