Skip to content

Commit

Permalink
avoids further time splitting in querysharding mware (#2252)
Browse files Browse the repository at this point in the history
* avoids further time splitting in querysharding mware

* removes duplicate utility fns

* linting
  • Loading branch information
owen-d authored Jun 23, 2020
1 parent 60bf23d commit 70900be
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 115 deletions.
84 changes: 5 additions & 79 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -16,8 +17,6 @@ import (
"github.com/grafana/loki/pkg/logql/marshal"
)

var nanosecondsInMillisecond = int64(time.Millisecond / time.Nanosecond)

// NewQueryShardMiddleware creates a middleware which downstreams queries after AST mapping and query encoding.
func NewQueryShardMiddleware(
logger log.Logger,
Expand Down Expand Up @@ -147,60 +146,12 @@ type shardSplitter struct {

func (splitter *shardSplitter) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
cutoff := splitter.now().Add(-splitter.MinShardingLookback)
sharded, nonsharded := partitionRequest(r, cutoff)

return splitter.parallel(ctx, sharded, nonsharded)

}

func (splitter *shardSplitter) parallel(ctx context.Context, sharded, nonsharded queryrange.Request) (queryrange.Response, error) {
if sharded == nil {
return splitter.next.Do(ctx, nonsharded)
// Only attempt to shard queries which are older than the sharding lookback (the period for which ingesters are also queried).
if !cutoff.After(util.TimeFromMillis(r.GetEnd())) {
return splitter.next.Do(ctx, r)
}

if nonsharded == nil {
return splitter.shardingware.Do(ctx, sharded)
}

nonshardCh := make(chan queryrange.Response, 1)
shardCh := make(chan queryrange.Response, 1)
errCh := make(chan error, 2)

go func() {
res, err := splitter.next.Do(ctx, nonsharded)
if err != nil {
errCh <- err
return
}
nonshardCh <- res

}()

go func() {
res, err := splitter.shardingware.Do(ctx, sharded)
if err != nil {
errCh <- err
return
}
shardCh <- res
}()

resps := make([]queryrange.Response, 0, 2)
for i := 0; i < 2; i++ {
select {
case r := <-nonshardCh:
resps = append(resps, r)
case r := <-shardCh:
resps = append(resps, r)
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}

}

return lokiCodec.MergeResponse(resps...)
return splitter.shardingware.Do(ctx, r)
}

// TODO(owen-d): export in cortex so we don't duplicate code
Expand All @@ -212,28 +163,3 @@ func hasShards(confs queryrange.ShardingConfigs) bool {
}
return false
}

// partitionRequet splits a request into potentially multiple requests, one including the request's time range
// [0,t). The other will include [t,inf)
// TODO(owen-d): export in cortex so we don't duplicate code
func partitionRequest(r queryrange.Request, t time.Time) (before queryrange.Request, after queryrange.Request) {
boundary := TimeToMillis(t)
if r.GetStart() >= boundary {
return nil, r
}

if r.GetEnd() < boundary {
return r, nil
}

return r.WithStartEnd(r.GetStart(), boundary), r.WithStartEnd(boundary, r.GetEnd())
}

// TimeFromMillis is a helper to turn milliseconds -> time.Time
func TimeFromMillis(ms int64) time.Time {
return time.Unix(0, ms*nanosecondsInMillisecond)
}

func TimeToMillis(t time.Time) int64 {
return t.UnixNano() / nanosecondsInMillisecond
}
86 changes: 50 additions & 36 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -68,46 +69,59 @@ var (
}
)

func Test_PartitionRequest(t *testing.T) {
midpt := time.Unix(0, 0).Add(500 * time.Millisecond)
cutoff := TimeToMillis(midpt)

// test split
req := defaultReq().WithStartEnd(0, cutoff*2)
before, after := partitionRequest(req, midpt)
require.Equal(t, req.WithStartEnd(0, cutoff), before)
require.Equal(t, req.WithStartEnd(cutoff, 2*cutoff), after)

// test all before cutoff
before, after = partitionRequest(req, midpt.Add(1000*time.Millisecond))
require.Equal(t, req, before)
require.Nil(t, after)

// test after cutoff
before, after = partitionRequest(req, time.Unix(0, 0))
require.Nil(t, before)
require.Equal(t, req, after)

}

func Test_shardSplitter(t *testing.T) {
splitter := &shardSplitter{
shardingware: mockHandler(lokiResps[0], nil),
next: mockHandler(lokiResps[1], nil),
now: time.Now,
MinShardingLookback: 0,
}

req := defaultReq().WithStartEnd(
TimeToMillis(time.Now().Add(-time.Hour)),
TimeToMillis(time.Now().Add(time.Hour)),
util.TimeToMillis(start),
util.TimeToMillis(end),
)

resp, err := splitter.Do(context.Background(), req)
require.Nil(t, err)
expected, err := lokiCodec.MergeResponse(lokiResps...)
require.Nil(t, err)
require.Equal(t, expected, resp)
for _, tc := range []struct {
desc string
lookback time.Duration
shouldShard bool
}{
{
desc: "older than lookback",
lookback: -time.Minute, // a negative lookback will ensure the entire query doesn't cross the sharding boundary & can safely be sharded.
shouldShard: true,
},
{
desc: "overlaps lookback",
lookback: end.Sub(start) / 2, // intersect the request causing it to avoid sharding
shouldShard: false,
},
{
desc: "newer than lookback",
lookback: end.Sub(start) + 1, // the entire query is in the ingester range and should avoid sharding.
shouldShard: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
var didShard bool

splitter := &shardSplitter{
shardingware: queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
didShard = true
return mockHandler(lokiResps[0], nil).Do(ctx, req)
}),
next: mockHandler(lokiResps[1], nil),
now: func() time.Time { return end },
MinShardingLookback: tc.lookback,
}

resp, err := splitter.Do(context.Background(), req)
require.Nil(t, err)

require.Equal(t, tc.shouldShard, didShard)
require.Nil(t, err)

if tc.shouldShard {
require.Equal(t, lokiResps[0], resp)
} else {
require.Equal(t, lokiResps[1], resp)
}
})
}
}

func Test_astMapper(t *testing.T) {
Expand Down

0 comments on commit 70900be

Please sign in to comment.