Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt queryTimeout to be a per-tenant configuration #6835

Merged
merged 13 commits into from
Aug 24, 2022
Merged
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
* [6358](https://github.com/grafana/loki/pull/6358) **taharah**: Fixes sigv4 authentication for the Ruler's remote write configuration by allowing both a global and per tenant configuration.
* [6375](https://github.com/grafana/loki/pull/6375) **dannykopping**: Fix bug that prevented users from using the `json` parser after a `line_format` pipeline stage.
##### Changes
* [6726](https://github.com/grafana/loki/pull/6726) **kavirajk** upgrades go from 1.17.9 -> 1.18.4
* [6415](https://github.com/grafana/loki/pull/6415) **salvacorts** Evenly spread queriers across kubernetes nodes.
* [6726](https://github.com/grafana/loki/pull/6726) **kavirajk**: upgrades go from 1.17.9 -> 1.18.4
* [6415](https://github.com/grafana/loki/pull/6415) **salvacorts**: Evenly spread queriers across kubernetes nodes.
* [6349](https://github.com/grafana/loki/pull/6349) **simonswine**: Update the default HTTP listen port from 80 to 3100. Make sure to configure the port explicitly if you are using port 80.
* [6835](https://github.com/grafana/loki/pull/6835) **DylanGuedes**: Add new per-tenant query timeout configuration and remove engine query timeout.

#### Promtail

Expand Down
3 changes: 2 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ The `querier` block configures the Loki Querier.

# Configuration options for the LogQL engine.
engine:
# Timeout for query execution
# Timeout for query execution.
# Deprecated: use querier.query-timeout instead.
# CLI flag: -querier.engine.timeout
[timeout: <duration> | default = 3m]

Expand Down
6 changes: 6 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

### Engine query timeout is deprecated

Previously, we had two configurations to define a query timeout: `engine.timeout` and `querier.query-timeout`.
As they were conflicting and `engine.timeout` isn't as expressive as `querier.query-tiomeout`,
we're deprecating it in favor of relying on `engine.query-timeout` only.

#### Fifocache is deprecated

We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run Loki without the need for an external cache (like Memcached, Redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode.
Expand Down
4 changes: 4 additions & 0 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (l *limiter) MaxQuerySeries(userID string) int {
return l.n
}

func (l *limiter) QueryTimeout(userID string) time.Duration {
return time.Minute * 5
}

type querier struct {
r io.Reader
labels labels.Labels
Expand Down
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (ng *DownstreamEngine) Opts() EngineOpts { return ng.opts }
func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.Expr) Query {
return &query{
logger: ng.logger,
timeout: ng.opts.Timeout,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
Expand Down
21 changes: 12 additions & 9 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,22 @@ type Querier interface {

// EngineOpts is the list of options to use with the LogQL query engine.
type EngineOpts struct {
// TODO: remove this after next release.
// Timeout for queries execution
Timeout time.Duration `yaml:"timeout"`

// MaxLookBackPeriod is the maximum amount of time to look back for log lines.
// only used for instant log queries.
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
}

func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&opts.Timeout, prefix+".engine.timeout", 5*time.Minute, "Timeout for query execution.")
// TODO: remove this configuration after next release.
f.DurationVar(&opts.Timeout, prefix+".engine.timeout", 5*time.Minute, "Timeout for query execution. Instead, rely only on querier.query-timeout. (deprecated)")
f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.")
}

func (opts *EngineOpts) applyDefault() {
if opts.Timeout == 0 {
opts.Timeout = 5 * time.Minute
}
if opts.MaxLookBackPeriod == 0 {
opts.MaxLookBackPeriod = 30 * time.Second
}
Expand All @@ -120,7 +120,6 @@ func (opts *EngineOpts) applyDefault() {
// Engine is the LogQL engine.
type Engine struct {
logger log.Logger
timeout time.Duration
evaluator Evaluator
limits Limits
}
Expand All @@ -133,7 +132,6 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine
}
return &Engine{
logger: logger,
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
}
Expand All @@ -143,7 +141,6 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine
func (ng *Engine) Query(params Params) Query {
return &query{
logger: ng.logger,
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: func(_ context.Context, query string) (syntax.Expr, error) {
Expand All @@ -162,7 +159,6 @@ type Query interface {

type query struct {
logger log.Logger
timeout time.Duration
params Params
parse func(context.Context, string) (syntax.Expr, error)
limits Limits
Expand Down Expand Up @@ -226,7 +222,14 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
queryTimeout := time.Minute * 5
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Warn(q.logger).Log("msg", fmt.Sprintf("couldn't fetch tenantID to evaluate query timeout, using default value of %s", queryTimeout), "err", err)
} else {
queryTimeout = q.limits.QueryTimeout(userID)
}
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

expr, err := q.parse(ctx, q.params.Query())
Expand Down
7 changes: 7 additions & 0 deletions pkg/logql/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logql

import (
"math"
"time"
)

var (
Expand All @@ -11,12 +12,18 @@ var (
// Limits allow the engine to fetch limits for a given users.
type Limits interface {
MaxQuerySeries(userID string) int
QueryTimeout(userID string) time.Duration
}

type fakeLimits struct {
maxSeries int
timeout time.Duration
}

func (f fakeLimits) MaxQuerySeries(userID string) int {
return f.maxSeries
}

func (f fakeLimits) QueryTimeout(userID string) time.Duration {
return f.timeout
}
37 changes: 25 additions & 12 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,31 @@ func (t *Loki) initQuerier() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "querier")
t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.overrides, logger)
queryHandlers := map[string]http.Handler{
"/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.RangeQueryHandler)),
"/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.InstantQueryHandler)),
"/loki/api/v1/label": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/series": http.HandlerFunc(t.querierAPI.SeriesHandler),
"/loki/api/v1/index/stats": http.HandlerFunc(t.querierAPI.IndexStatsHandler),

"/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.LogQueryHandler)),
"/api/prom/label": http.HandlerFunc(t.querierAPI.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
"/api/prom/series": http.HandlerFunc(t.querierAPI.SeriesHandler),
"/loki/api/v1/query_range": middleware.Merge(
httpMiddleware,
querier.WrapQuerySpanAndTimeout("query.RangeQuery", t.querierAPI),
).Wrap(http.HandlerFunc(t.querierAPI.RangeQueryHandler)),

"/loki/api/v1/query": middleware.Merge(
httpMiddleware,
querier.WrapQuerySpanAndTimeout("query.InstantQuery", t.querierAPI),
).Wrap(http.HandlerFunc(t.querierAPI.InstantQueryHandler)),

"/loki/api/v1/label": querier.WrapQuerySpanAndTimeout("query.Label", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)),
"/loki/api/v1/labels": querier.WrapQuerySpanAndTimeout("query.Label", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)),
"/loki/api/v1/label/{name}/values": querier.WrapQuerySpanAndTimeout("query.Label", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)),

"/loki/api/v1/series": querier.WrapQuerySpanAndTimeout("query.Series", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesHandler)),
"/loki/api/v1/index/stats": querier.WrapQuerySpanAndTimeout("query.IndexStats", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.IndexStatsHandler)),

"/api/prom/query": middleware.Merge(
httpMiddleware,
querier.WrapQuerySpanAndTimeout("query.LogQuery", t.querierAPI),
).Wrap(http.HandlerFunc(t.querierAPI.LogQueryHandler)),

"/api/prom/label": querier.WrapQuerySpanAndTimeout("query.Label", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)),
"/api/prom/label/{name}/values": querier.WrapQuerySpanAndTimeout("query.Label", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)),
"/api/prom/series": querier.WrapQuerySpanAndTimeout("query.Series", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesHandler)),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
Expand Down
55 changes: 33 additions & 22 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/grafana/dskit/tenant"

Expand Down Expand Up @@ -64,16 +65,13 @@ func NewQuerierAPI(cfg Config, querier Querier, limits *validation.Overrides, lo

// RangeQueryHandler is a http.HandlerFunc for range queries.
func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

request, err := loghttp.ParseRangeQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
Expand Down Expand Up @@ -103,16 +101,13 @@ func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {

// InstantQueryHandler is a http.HandlerFunc for instant queries.
func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

request, err := loghttp.ParseInstantQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
Expand Down Expand Up @@ -143,10 +138,6 @@ func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request)

// LogQueryHandler is a http.HandlerFunc for log only queries.
func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

request, err := loghttp.ParseRangeQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
Expand All @@ -170,6 +161,7 @@ func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
return
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
Expand Down Expand Up @@ -207,13 +199,11 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
return
}

log, ctx := spanlogger.New(r.Context(), "query.Label")

timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("labels"))
defer timer.ObserveDuration()

start := time.Now()
statsCtx, ctx := stats.NewContext(ctx)
statsCtx, ctx := stats.NewContext(r.Context())

resp, err := q.querier.Label(r.Context(), req)
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
Expand All @@ -224,6 +214,7 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
}
// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))

status := 200
Expand Down Expand Up @@ -382,13 +373,11 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
return
}

log, ctx := spanlogger.New(r.Context(), "query.Series")

timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("series"))
defer timer.ObserveDuration()

start := time.Now()
statsCtx, ctx := stats.NewContext(ctx)
statsCtx, ctx := stats.NewContext(r.Context())

resp, err := q.querier.Series(r.Context(), req)
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
Expand All @@ -400,6 +389,7 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {

// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))

status := 200
Expand All @@ -422,17 +412,14 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {

// IndexStatsHandler queries the index for the data statistics related to a query
func (q *QuerierAPI) IndexStatsHandler(w http.ResponseWriter, r *http.Request) {

req, err := loghttp.ParseIndexStatsQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

_, ctx := spanlogger.New(r.Context(), "query.IndexStats")

// TODO(owen-d): log metadata, record stats?
resp, err := q.querier.IndexStats(ctx, req)
resp, err := q.querier.IndexStats(r.Context(), req)
if resp == nil {
// Some stores don't implement this
resp = &index_stats.Stats{}
Expand Down Expand Up @@ -492,3 +479,27 @@ func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, li
}
return nil
}

// WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call.
//
// The timeout is based on the per-tenant query timeout configuration.
func WrapQuerySpanAndTimeout(call string, q *QuerierAPI) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
log, ctx := spanlogger.New(req.Context(), call)
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(userID)
_, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

next.ServeHTTP(w, req)
})
})
}
Loading