diff --git a/CHANGELOG.md b/CHANGELOG.md index a002ec14dbc2..7c910b53664e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [1572](https://github.com/grafana/loki/pull/1572) **owen-d**: Introduces the `querier.query-ingesters-within` flag and associated yaml config. When enabled, queries for a time range that do not overlap this lookback interval will not be sent to the ingesters. * [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut. +* [1565](https://github.com/grafana/loki/pull/1565) **owen-d**: The query frontend's `split_queries_by_interval` can now be specified as an override ## 1.3.0 (2020-01-16) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go new file mode 100644 index 000000000000..3ce777f10e11 --- /dev/null +++ b/pkg/querier/queryrange/limits.go @@ -0,0 +1,56 @@ +package queryrange + +import ( + "fmt" + "time" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" +) + +// Limits extends the cortex limits interface with support for per tenant splitby parameters +type Limits interface { + queryrange.Limits + QuerySplitDuration(string) time.Duration +} + +type limits struct { + Limits + splitDuration time.Duration +} + +func (l limits) QuerySplitDuration(user string) time.Duration { + dur := l.Limits.QuerySplitDuration(user) + if dur == 0 { + return l.splitDuration + } + return dur +} + +// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present. +func WithDefaultLimits(l Limits, conf queryrange.Config) Limits { + res := limits{ + Limits: l, + } + + if conf.SplitQueriesByDay { + res.splitDuration = 24 * time.Hour + } + + if conf.SplitQueriesByInterval != 0 { + res.splitDuration = conf.SplitQueriesByInterval + } + + return res +} + +// cacheKeyLimits intersects Limits and CacheSplitter +type cacheKeyLimits struct { + Limits +} + +// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring +// a nonzero split interval when caching is enabled +func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) string { + currentInterval := r.GetStart() / int64(l.QuerySplitDuration(userID)/time.Millisecond) + return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) +} diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go new file mode 100644 index 000000000000..b58f8aaff85f --- /dev/null +++ b/pkg/querier/queryrange/limits_test.go @@ -0,0 +1,34 @@ +package queryrange + +import ( + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/stretchr/testify/require" +) + +func TestWithDefaultLimits(t *testing.T) { + l := fakeLimits{ + splits: map[string]time.Duration{"a": time.Minute}, + } + + require.Equal(t, l.QuerySplitDuration("a"), time.Minute) + require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0)) + + wrapped := WithDefaultLimits(l, queryrange.Config{ + SplitQueriesByDay: true, + }) + + require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute) + require.Equal(t, wrapped.QuerySplitDuration("b"), 24*time.Hour) + + wrapped = WithDefaultLimits(l, queryrange.Config{ + SplitQueriesByDay: true, // should be overridden by SplitQueriesByInterval + SplitQueriesByInterval: time.Hour, + }) + + require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute) + require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour) + +} diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 8758295ca0e1..5133382db7f8 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -5,9 +5,11 @@ import ( "net/http" "strings" + "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/logql" "github.com/prometheus/prometheus/pkg/labels" ) @@ -28,8 +30,13 @@ type Stopper interface { } // NewTripperware returns a Tripperware configured with middlewares to align, split and cache requests. -func NewTripperware(cfg Config, log log.Logger, limits queryrange.Limits) (frontend.Tripperware, Stopper, error) { - metricsTripperware, cache, err := queryrange.NewTripperware(cfg.Config, log, limits, lokiCodec, queryrange.PrometheusResponseExtractor) +func NewTripperware(cfg Config, log log.Logger, limits Limits) (frontend.Tripperware, Stopper, error) { + // Ensure that QuerySplitDuration uses configuration defaults. + // This avoids divide by zero errors when determining cache keys where user specific overrides don't exist. + limits = WithDefaultLimits(limits, cfg.Config) + + metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, queryrange.PrometheusResponseExtractor) + if err != nil { return nil, nil, err } @@ -78,12 +85,12 @@ func NewTripperware(cfg Config, log log.Logger, limits queryrange.Limits) (front func NewLogFilterTripperware( cfg Config, log log.Logger, - limits queryrange.Limits, + limits Limits, codec queryrange.Codec, ) (frontend.Tripperware, error) { queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)} if cfg.SplitQueriesByInterval != 0 { - queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(cfg.SplitQueriesByInterval, limits, codec)) + queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(limits, codec)) } if cfg.MaxRetries > 0 { queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry"), queryrange.NewRetryMiddleware(log, cfg.MaxRetries)) @@ -95,3 +102,76 @@ func NewLogFilterTripperware( return next }), nil } + +// NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries +func NewMetricTripperware( + cfg Config, + log log.Logger, + limits Limits, + codec queryrange.Codec, + extractor queryrange.Extractor, +) (frontend.Tripperware, Stopper, error) { + + queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)} + if cfg.AlignQueriesWithStep { + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("step_align"), + queryrange.StepAlignMiddleware, + ) + } + + // SplitQueriesByDay is deprecated use SplitQueriesByInterval. + if cfg.SplitQueriesByDay { + level.Warn(log).Log("msg", "flag querier.split-queries-by-day (or config split_queries_by_day) is deprecated, use querier.split-queries-by-interval instead.") + } + + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("split_by_interval"), + SplitByIntervalMiddleware(limits, codec), + ) + + var c cache.Cache + if cfg.CacheResults { + queryCacheMiddleware, cache, err := queryrange.NewResultsCacheMiddleware( + log, + cfg.ResultsCacheConfig, + cacheKeyLimits{limits}, + limits, + codec, + extractor, + ) + if err != nil { + return nil, nil, err + } + c = cache + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("results_cache"), + queryCacheMiddleware, + ) + } + + if cfg.MaxRetries > 0 { + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("retry"), + queryrange.NewRetryMiddleware(log, cfg.MaxRetries), + ) + } + + return frontend.Tripperware(func(next http.RoundTripper) http.RoundTripper { + // Finally, if the user selected any query range middleware, stitch it in. + if len(queryRangeMiddleware) > 0 { + rt := queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...) + return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { + if !strings.HasSuffix(r.URL.Path, "/query_range") { + return next.RoundTrip(r) + } + return rt.RoundTrip(r) + }) + } + return next + }), c, nil +} diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 2198aa9f5b49..ea2dd611b943 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -287,6 +287,14 @@ func TestRegexpParamsSupport(t *testing.T) { type fakeLimits struct { maxQueryParallelism int + splits map[string]time.Duration +} + +func (f fakeLimits) QuerySplitDuration(key string) time.Duration { + if f.splits == nil { + return 0 + } + return f.splits[key] } func (fakeLimits) MaxQueryLength(string) time.Duration { diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index be733772d246..c2380d795af2 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -12,13 +12,12 @@ import ( ) // SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval. -func SplitByIntervalMiddleware(interval time.Duration, limits queryrange.Limits, merger queryrange.Merger) queryrange.Middleware { +func SplitByIntervalMiddleware(limits Limits, merger queryrange.Merger) queryrange.Middleware { return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler { return &splitByInterval{ - next: next, - limits: limits, - merger: merger, - interval: interval, + next: next, + limits: limits, + merger: merger, } }) } @@ -34,10 +33,9 @@ type packedResp struct { } type splitByInterval struct { - next queryrange.Handler - limits queryrange.Limits - merger queryrange.Merger - interval time.Duration + next queryrange.Handler + limits Limits + merger queryrange.Merger } func (h *splitByInterval) Feed(ctx context.Context, input []*lokiResult) chan *lokiResult { @@ -69,6 +67,12 @@ func (h *splitByInterval) Process( ch := h.Feed(ctx, input) + // queries with 0 limits should not be exited early + var unlimited bool + if threshold == 0 { + unlimited = true + } + // don't spawn unnecessary goroutines var p int = parallelism if len(input) < parallelism { @@ -91,10 +95,15 @@ func (h *splitByInterval) Process( responses = append(responses, data.resp) // see if we can exit early if a limit has been reached - threshold -= data.resp.(*LokiResponse).Count() - if threshold <= 0 { - return responses, nil + if casted, ok := data.resp.(*LokiResponse); !unlimited && ok { + threshold -= casted.Count() + + if threshold <= 0 { + return responses, nil + } + } + } } @@ -129,7 +138,13 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra return nil, err } - intervals := splitByTime(lokiRequest, h.interval) + interval := h.limits.QuerySplitDuration(userid) + // skip split by if unset + if interval == 0 { + return h.next.Do(ctx, r) + } + + intervals := splitByTime(lokiRequest, interval) if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogFields(otlog.Int("n_intervals", len(intervals))) diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 78758a98fc8c..a30faaa054c3 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -79,31 +79,32 @@ func Test_splitQuery(t *testing.T) { func Test_splitByInterval_Do(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") - split := splitByInterval{ - next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { - return &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: r.(*LokiRequest).Direction, - Limit: r.(*LokiRequest).Limit, - Version: uint32(loghttp.VersionV1), - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: []logproto.Stream{ - { - Labels: `{foo="bar", level="debug"}`, - Entries: []logproto.Entry{ + next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: r.(*LokiRequest).Direction, + Limit: r.(*LokiRequest).Limit, + Version: uint32(loghttp.VersionV1), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano())}, - }, + {Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano())}, }, }, }, - }, nil - }), - limits: fakeLimits{}, - merger: lokiCodec, - interval: time.Hour, - } + }, + }, nil + }) + + l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour}) + split := SplitByIntervalMiddleware( + l, + lokiCodec, + ).Wrap(next) tests := []struct { name string @@ -252,40 +253,41 @@ func Test_ExitEarly(t *testing.T) { var callCt int var mtx sync.Mutex - split := splitByInterval{ - next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { - time.Sleep(time.Millisecond) // artificial delay to minimize race condition exposure in test + next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + time.Sleep(time.Millisecond) // artificial delay to minimize race condition exposure in test - mtx.Lock() - defer mtx.Unlock() - callCt++ + mtx.Lock() + defer mtx.Unlock() + callCt++ - return &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: r.(*LokiRequest).Direction, - Limit: r.(*LokiRequest).Limit, - Version: uint32(loghttp.VersionV1), - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: []logproto.Stream{ - { - Labels: `{foo="bar", level="debug"}`, - Entries: []logproto.Entry{ + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: r.(*LokiRequest).Direction, + Limit: r.(*LokiRequest).Limit, + Version: uint32(loghttp.VersionV1), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), - Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), - }, + { + Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), + Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), }, }, }, }, - }, nil - }), - limits: fakeLimits{}, - merger: lokiCodec, - interval: time.Hour, - } + }, + }, nil + }) + + l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour}) + split := SplitByIntervalMiddleware( + l, + lokiCodec, + ).Wrap(next) req := &LokiRequest{ StartTs: time.Unix(0, 0), @@ -332,37 +334,38 @@ func Test_ExitEarly(t *testing.T) { func Test_DoesntDeadlock(t *testing.T) { n := 10 - split := splitByInterval{ - next: queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { - return &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: r.(*LokiRequest).Direction, - Limit: r.(*LokiRequest).Limit, - Version: uint32(loghttp.VersionV1), - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: []logproto.Stream{ - { - Labels: `{foo="bar", level="debug"}`, - Entries: []logproto.Entry{ + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: r.(*LokiRequest).Direction, + Limit: r.(*LokiRequest).Limit, + Version: uint32(loghttp.VersionV1), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar", level="debug"}`, + Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), - Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), - }, + { + Timestamp: time.Unix(0, r.(*LokiRequest).StartTs.UnixNano()), + Line: fmt.Sprintf("%d", r.(*LokiRequest).StartTs.UnixNano()), }, }, }, }, - }, nil - }), - limits: fakeLimits{ - maxQueryParallelism: n, - }, - merger: lokiCodec, - interval: time.Hour, - } + }, + }, nil + }) + + l := WithDefaultLimits(fakeLimits{ + maxQueryParallelism: n, + }, queryrange.Config{SplitQueriesByInterval: time.Hour}) + split := SplitByIntervalMiddleware( + l, + lokiCodec, + ).Wrap(next) // split into n requests w/ n/2 limit, ensuring unused responses are cleaned up properly req := &LokiRequest{ diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 34810d86ae06..a14884572f79 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -42,6 +42,9 @@ type Limits struct { MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"` MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"` + // Query frontend enforced limits. The default is actually parameterized by the queryrange config. + QuerySplitDuration time.Duration `yaml:"split_queries_by_interval"` + // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"` @@ -214,6 +217,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } +// QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend. +func (o *Overrides) QuerySplitDuration(userID string) time.Duration { + return o.getOverridesForUser(userID).QuerySplitDuration +} + // MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. func (o *Overrides) MaxConcurrentTailRequests(userID string) int { return o.getOverridesForUser(userID).MaxConcurrentTailRequests