Skip to content

Commit

Permalink
Feature/per tenant splitby (#1565)
Browse files Browse the repository at this point in the history
* split by overrides by tenant

* tenant aware metric query splitting

* consistent split_queries_by_interval naming

* updates readme

* fixes rebase conflict

* refactor to support limit based splitby intervals in all locations

* simplifies limits cachesplitter

* fixes comment
  • Loading branch information
owen-d authored Jan 31, 2020
1 parent 891eae9 commit 3e07549
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [1572](https://github.com/grafana/loki/pull/1572) **owen-d**: Introduces the `querier.query-ingesters-within` flag and associated yaml config. When enabled, queries for a time range that do not overlap this lookback interval will not be sent to the ingesters.
* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut.
* [1565](https://github.com/grafana/loki/pull/1565) **owen-d**: The query frontend's `split_queries_by_interval` can now be specified as an override

## 1.3.0 (2020-01-16)

Expand Down
56 changes: 56 additions & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package queryrange

import (
"fmt"
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
)

// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrange.Limits
QuerySplitDuration(string) time.Duration
}

type limits struct {
Limits
splitDuration time.Duration
}

func (l limits) QuerySplitDuration(user string) time.Duration {
dur := l.Limits.QuerySplitDuration(user)
if dur == 0 {
return l.splitDuration
}
return dur
}

// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrange.Config) Limits {
res := limits{
Limits: l,
}

if conf.SplitQueriesByDay {
res.splitDuration = 24 * time.Hour
}

if conf.SplitQueriesByInterval != 0 {
res.splitDuration = conf.SplitQueriesByInterval
}

return res
}

// cacheKeyLimits intersects Limits and CacheSplitter
type cacheKeyLimits struct {
Limits
}

// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring
// a nonzero split interval when caching is enabled
func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) string {
currentInterval := r.GetStart() / int64(l.QuerySplitDuration(userID)/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}
34 changes: 34 additions & 0 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package queryrange

import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/stretchr/testify/require"
)

func TestWithDefaultLimits(t *testing.T) {
l := fakeLimits{
splits: map[string]time.Duration{"a": time.Minute},
}

require.Equal(t, l.QuerySplitDuration("a"), time.Minute)
require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0))

wrapped := WithDefaultLimits(l, queryrange.Config{
SplitQueriesByDay: true,
})

require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
require.Equal(t, wrapped.QuerySplitDuration("b"), 24*time.Hour)

wrapped = WithDefaultLimits(l, queryrange.Config{
SplitQueriesByDay: true, // should be overridden by SplitQueriesByInterval
SplitQueriesByInterval: time.Hour,
})

require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)

}
88 changes: 84 additions & 4 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"net/http"
"strings"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand All @@ -28,8 +30,13 @@ type Stopper interface {
}

// NewTripperware returns a Tripperware configured with middlewares to align, split and cache requests.
func NewTripperware(cfg Config, log log.Logger, limits queryrange.Limits) (frontend.Tripperware, Stopper, error) {
metricsTripperware, cache, err := queryrange.NewTripperware(cfg.Config, log, limits, lokiCodec, queryrange.PrometheusResponseExtractor)
func NewTripperware(cfg Config, log log.Logger, limits Limits) (frontend.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
// This avoids divide by zero errors when determining cache keys where user specific overrides don't exist.
limits = WithDefaultLimits(limits, cfg.Config)

metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, queryrange.PrometheusResponseExtractor)

if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -78,12 +85,12 @@ func NewTripperware(cfg Config, log log.Logger, limits queryrange.Limits) (front
func NewLogFilterTripperware(
cfg Config,
log log.Logger,
limits queryrange.Limits,
limits Limits,
codec queryrange.Codec,
) (frontend.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(cfg.SplitQueriesByInterval, limits, codec))
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(limits, codec))
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry"), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
Expand All @@ -95,3 +102,76 @@ func NewLogFilterTripperware(
return next
}), nil
}

// NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewMetricTripperware(
cfg Config,
log log.Logger,
limits Limits,
codec queryrange.Codec,
extractor queryrange.Extractor,
) (frontend.Tripperware, Stopper, error) {

queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("step_align"),
queryrange.StepAlignMiddleware,
)
}

// SplitQueriesByDay is deprecated use SplitQueriesByInterval.
if cfg.SplitQueriesByDay {
level.Warn(log).Log("msg", "flag querier.split-queries-by-day (or config split_queries_by_day) is deprecated, use querier.split-queries-by-interval instead.")
}

queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("split_by_interval"),
SplitByIntervalMiddleware(limits, codec),
)

var c cache.Cache
if cfg.CacheResults {
queryCacheMiddleware, cache, err := queryrange.NewResultsCacheMiddleware(
log,
cfg.ResultsCacheConfig,
cacheKeyLimits{limits},
limits,
codec,
extractor,
)
if err != nil {
return nil, nil, err
}
c = cache
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("results_cache"),
queryCacheMiddleware,
)
}

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("retry"),
queryrange.NewRetryMiddleware(log, cfg.MaxRetries),
)
}

return frontend.Tripperware(func(next http.RoundTripper) http.RoundTripper {
// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
rt := queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...)
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
if !strings.HasSuffix(r.URL.Path, "/query_range") {
return next.RoundTrip(r)
}
return rt.RoundTrip(r)
})
}
return next
}), c, nil
}
8 changes: 8 additions & 0 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ func TestRegexpParamsSupport(t *testing.T) {

type fakeLimits struct {
maxQueryParallelism int
splits map[string]time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
if f.splits == nil {
return 0
}
return f.splits[key]
}

func (fakeLimits) MaxQueryLength(string) time.Duration {
Expand Down
41 changes: 28 additions & 13 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ import (
)

// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(interval time.Duration, limits queryrange.Limits, merger queryrange.Merger) queryrange.Middleware {
func SplitByIntervalMiddleware(limits Limits, merger queryrange.Merger) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return &splitByInterval{
next: next,
limits: limits,
merger: merger,
interval: interval,
next: next,
limits: limits,
merger: merger,
}
})
}
Expand All @@ -34,10 +33,9 @@ type packedResp struct {
}

type splitByInterval struct {
next queryrange.Handler
limits queryrange.Limits
merger queryrange.Merger
interval time.Duration
next queryrange.Handler
limits Limits
merger queryrange.Merger
}

func (h *splitByInterval) Feed(ctx context.Context, input []*lokiResult) chan *lokiResult {
Expand Down Expand Up @@ -69,6 +67,12 @@ func (h *splitByInterval) Process(

ch := h.Feed(ctx, input)

// queries with 0 limits should not be exited early
var unlimited bool
if threshold == 0 {
unlimited = true
}

// don't spawn unnecessary goroutines
var p int = parallelism
if len(input) < parallelism {
Expand All @@ -91,10 +95,15 @@ func (h *splitByInterval) Process(
responses = append(responses, data.resp)

// see if we can exit early if a limit has been reached
threshold -= data.resp.(*LokiResponse).Count()
if threshold <= 0 {
return responses, nil
if casted, ok := data.resp.(*LokiResponse); !unlimited && ok {
threshold -= casted.Count()

if threshold <= 0 {
return responses, nil
}

}

}

}
Expand Down Expand Up @@ -129,7 +138,13 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra
return nil, err
}

intervals := splitByTime(lokiRequest, h.interval)
interval := h.limits.QuerySplitDuration(userid)
// skip split by if unset
if interval == 0 {
return h.next.Do(ctx, r)
}

intervals := splitByTime(lokiRequest, interval)

if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogFields(otlog.Int("n_intervals", len(intervals)))
Expand Down
Loading

0 comments on commit 3e07549

Please sign in to comment.