Skip to content

Commit

Permalink
apply @ modifier start and end in QF split interval middleware
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Nov 1, 2022
1 parent 7968915 commit 843da02
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 8 deletions.
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {

// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
query, err := EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
Expand All @@ -93,10 +93,10 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {
return reqs, nil
}

// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// EvaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
func EvaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func Test_evaluateAtModifier(t *testing.T) {
tt := tt
t.Run(tt.in, func(t *testing.T) {
t.Parallel()
out, err := evaluateAtModifierFunction(tt.in, start, end)
out, err := EvaluateAtModifierFunction(tt.in, start, end)
if tt.expectedErrorCode != 0 {
require.Error(t, err)
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down
17 changes: 13 additions & 4 deletions pkg/queryfrontend/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type splitByInterval struct {
func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs := splitQuery(r, s.interval(r))
reqs, err := splitQuery(r, s.interval(r))
if err != nil {
return nil, err
}
s.splitByCounter.Add(float64(len(reqs)))

reqResps, err := queryrange.DoRequests(ctx, s.next, reqs, s.limits)
Expand All @@ -66,9 +69,15 @@ func (s splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryran
return response, nil
}

func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Request {
func splitQuery(r queryrange.Request, interval time.Duration) ([]queryrange.Request, error) {
var reqs []queryrange.Request
if _, ok := r.(*ThanosQueryRangeRequest); ok {
// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := queryrange.EvaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
if start := r.GetStart(); start == r.GetEnd() {
reqs = append(reqs, r.WithStartEnd(start, start))
} else {
Expand All @@ -78,7 +87,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
end = r.GetEnd()
}

reqs = append(reqs, r.WithStartEnd(start, end))
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
}
}
} else {
Expand All @@ -93,7 +102,7 @@ func splitQuery(r queryrange.Request, interval time.Duration) []queryrange.Reque
}
}

return reqs
return reqs, nil
}

// Round up to the step before the next interval boundary.
Expand Down
237 changes: 237 additions & 0 deletions pkg/queryfrontend/split_by_interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package queryfrontend

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
)

func TestSplitQuery(t *testing.T) {
for i, tc := range []struct {
input queryrange.Request
expected []queryrange.Request
interval time.Duration
}{
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 60 * 60 * seconds,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 60 * 60 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ start()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 0.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 0.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ end()",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo @ 172800.000",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 0,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 0,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 2 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
{
input: &ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 24 * 3600 * seconds,
End: (2 * 24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 24 * 3600 * seconds,
End: 3 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: day,
},
{
input: &ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
expected: []queryrange.Request{
&ThanosQueryRangeRequest{
Start: 2 * 3600 * seconds,
End: (3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 3 * 3600 * seconds,
End: (2 * 3 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
},
&ThanosQueryRangeRequest{
Start: 2 * 3 * 3600 * seconds,
End: 3 * 3 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
},
},
interval: 3 * time.Hour,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
days, err := splitQuery(tc.input, tc.interval)
require.NoError(t, err)
require.Equal(t, tc.expected, days)
})
}
}

0 comments on commit 843da02

Please sign in to comment.