Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt queryTimeout to be a per-tenant configuration #6835

Merged
merged 13 commits into from
Aug 24, 2022
Merged
4 changes: 4 additions & 0 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (l *limiter) MaxQuerySeries(userID string) int {
return l.n
}

func (l *limiter) QueryTimeout(userID string) time.Duration {
return time.Minute * 5
}

type querier struct {
r io.Reader
labels labels.Labels
Expand Down
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (ng *DownstreamEngine) Opts() EngineOpts { return ng.opts }
func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.Expr) Query {
return &query{
logger: ng.logger,
timeout: ng.opts.Timeout,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
Expand Down
19 changes: 8 additions & 11 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,16 @@ type Querier interface {

// EngineOpts is the list of options to use with the LogQL query engine.
type EngineOpts struct {
// Timeout for queries execution
Timeout time.Duration `yaml:"timeout"`
// MaxLookBackPeriod is the maximum amount of time to look back for log lines.
// only used for instant log queries.
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
}

func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&opts.Timeout, prefix+".engine.timeout", 5*time.Minute, "Timeout for query execution.")
f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.")
}

func (opts *EngineOpts) applyDefault() {
if opts.Timeout == 0 {
opts.Timeout = 5 * time.Minute
}
if opts.MaxLookBackPeriod == 0 {
opts.MaxLookBackPeriod = 30 * time.Second
}
Expand All @@ -120,7 +114,6 @@ func (opts *EngineOpts) applyDefault() {
// Engine is the LogQL engine.
type Engine struct {
logger log.Logger
timeout time.Duration
evaluator Evaluator
limits Limits
}
Expand All @@ -133,7 +126,6 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine
}
return &Engine{
logger: logger,
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
}
Expand All @@ -143,7 +135,6 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine
func (ng *Engine) Query(params Params) Query {
return &query{
logger: ng.logger,
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: func(_ context.Context, query string) (syntax.Expr, error) {
Expand All @@ -162,7 +153,6 @@ type Query interface {

type query struct {
logger log.Logger
timeout time.Duration
params Params
parse func(context.Context, string) (syntax.Expr, error)
limits Limits
Expand Down Expand Up @@ -226,7 +216,14 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
queryTimeout := time.Minute * 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better we move this to package level const? (e.g: DefaultQueryTimeout). And Q: what's the rationale behind 2 minutes as default? (Previously query engine was using 5 * minute as default timeout. Is it to standardize it? then we may need to add it in upgrade guide as well I think)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm where did you find that the engine had 5 minutes default timeout?

dannykopping marked this conversation as resolved.
Show resolved Hide resolved
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Warn(q.logger).Log("msg", "couldn't fetch tenantID to evaluate query timeout, using default value of 2m", "err", err)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
} else {
queryTimeout = q.limits.QueryTimeout(userID)
}
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

expr, err := q.parse(ctx, q.params.Query())
Expand Down
7 changes: 7 additions & 0 deletions pkg/logql/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logql

import (
"math"
"time"
)

var (
Expand All @@ -11,12 +12,18 @@ var (
// Limits allow the engine to fetch limits for a given users.
type Limits interface {
MaxQuerySeries(userID string) int
QueryTimeout(userID string) time.Duration
}

type fakeLimits struct {
maxSeries int
timeout time.Duration
}

func (f fakeLimits) MaxQuerySeries(userID string) int {
return f.maxSeries
}

func (f fakeLimits) QueryTimeout(userID string) time.Duration {
return f.timeout
}
67 changes: 58 additions & 9 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,16 @@ func NewQuerierAPI(cfg Config, querier Querier, limits *validation.Overrides, lo

// RangeQueryHandler is a http.HandlerFunc for range queries.
func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.RangeQuery")

userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
}
dannykopping marked this conversation as resolved.
Show resolved Hide resolved

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

request, err := loghttp.ParseRangeQuery(r)
Expand Down Expand Up @@ -103,8 +111,16 @@ func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {

// InstantQueryHandler is a http.HandlerFunc for instant queries.
func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.InstantQuery")

userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

request, err := loghttp.ParseInstantQuery(r)
Expand Down Expand Up @@ -143,8 +159,15 @@ func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request)

// LogQueryHandler is a http.HandlerFunc for log only queries.
func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.LogQuery")
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

request, err := loghttp.ParseRangeQuery(r)
Expand Down Expand Up @@ -201,14 +224,23 @@ func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {

// LabelHandler is a http.HandlerFunc for handling label queries.
func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.Label")
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

req, err := loghttp.ParseLabelQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

log, ctx := spanlogger.New(r.Context(), "query.Label")

timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("labels"))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -376,14 +408,23 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
// SeriesHandler returns the list of time series that match a certain label set.
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.Series")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing it is being repeated for almost all the handler, should we make this as middleware?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think it shouldn't, as some handlers won't want this (ex: the tail handler)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree about the middleware; we can apply the middleware selectively - which is already done in pkg/loki/modules.go:

	queryHandlers := map[string]http.Handler{
		"/loki/api/v1/query_range":         httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.RangeQueryHandler)),
		"/loki/api/v1/query":               httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.InstantQueryHandler)),
		"/loki/api/v1/label":               http.HandlerFunc(t.querierAPI.LabelHandler),
		"/loki/api/v1/labels":              http.HandlerFunc(t.querierAPI.LabelHandler),
		"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
		"/loki/api/v1/series":              http.HandlerFunc(t.querierAPI.SeriesHandler),
		"/loki/api/v1/index/stats":         http.HandlerFunc(t.querierAPI.IndexStatsHandler),

userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

req, err := loghttp.ParseAndValidateSeriesQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

log, ctx := spanlogger.New(r.Context(), "query.Series")

timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("series"))
defer timer.ObserveDuration()

Expand Down Expand Up @@ -422,15 +463,23 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {

// IndexStatsHandler queries the index for the data statistics related to a query
func (q *QuerierAPI) IndexStatsHandler(w http.ResponseWriter, r *http.Request) {
log, ctx := spanlogger.New(r.Context(), "query.IndexStats")
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

req, err := loghttp.ParseIndexStatsQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

_, ctx := spanlogger.New(r.Context(), "query.IndexStats")

// TODO(owen-d): log metadata, record stats?
resp, err := q.querier.IndexStats(ctx, req)
if resp == nil {
Expand Down
18 changes: 12 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type interval struct {

// Config for a querier.
type Config struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"`
Expand All @@ -60,7 +59,6 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Engine.RegisterFlagsWithPrefix("querier", f)
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
f.DurationVar(&cfg.QueryTimeout, "querier.query-timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 10, "The maximum number of concurrent queries.")
Expand Down Expand Up @@ -355,7 +353,8 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -439,7 +438,12 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques
// Enforce the query timeout except when tailing, otherwise the tailing
// will be terminated once the query timeout is reached
tailCtx := ctx
queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
tenantID, err := tenant.TenantID(tailCtx)
if err != nil {
level.Error(spanlogger.FromContext(tailCtx)).Log("msg", "failed to load tenant", "err", err)
}
queryTimeout := q.limits.QueryTimeout(tenantID)
queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancelQuery()

tailClients, err := q.ingesterQuerier.Tail(tailCtx, req)
Expand Down Expand Up @@ -482,7 +486,8 @@ func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRe
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

return q.awaitSeries(ctx, req)
Expand Down Expand Up @@ -703,7 +708,8 @@ func (q *SingleTenantQuerier) IndexStats(ctx context.Context, req *loghttp.Range
}

// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
queryTimeout := q.limits.QueryTimeout(userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
defer cancel()

return q.store.Stats(
Expand Down
10 changes: 6 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {

store := newStoreMock()
store.On("LabelValuesForMetricName", mock.Anything, "test", model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
limitsCfg := defaultLimitsTestConfig()
limitsCfg.QueryTimeout = model.Duration(queryTimeout)
limits, err := validation.NewOverrides(limitsCfg, nil)
require.NoError(t, err)

q, err := newQuerier(
Expand Down Expand Up @@ -101,7 +102,9 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil)
ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{}, nil)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
limitsCfg := defaultLimitsTestConfig()
limitsCfg.QueryTimeout = model.Duration(queryTimeout)
limits, err := validation.NewOverrides(limitsCfg, nil)
require.NoError(t, err)

q, err := newQuerier(
Expand Down Expand Up @@ -140,7 +143,6 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
func mockQuerierConfig() Config {
return Config{
TailMaxDuration: 1 * time.Minute,
QueryTimeout: queryTimeout,
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func Test_shardSplitter(t *testing.T) {
now: func() time.Time { return end },
limits: fakeLimits{
minShardingLookback: tc.lookback,
queryTimeout: time.Minute,
maxQueryParallelism: 1,
},
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func Test_astMapper(t *testing.T) {
handler,
log.NewNopLogger(),
nilShardingMetrics,
fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1},
fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1, queryTimeout: time.Second},
)

resp, err := mware.Do(user.InjectOrgID(context.Background(), "1"), defaultReq().WithQuery(`{food="bar"}`))
Expand Down Expand Up @@ -257,6 +258,7 @@ func Test_InstantSharding(t *testing.T) {
fakeLimits{
maxSeries: math.MaxInt32,
maxQueryParallelism: 10,
queryTimeout: time.Second,
})
response, err := sharding.Wrap(queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
lock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ type fakeLimits struct {
maxSeries int
splits map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand Down Expand Up @@ -643,6 +644,10 @@ func (f fakeLimits) MinShardingLookback(string) time.Duration {
return f.minShardingLookback
}

func (f fakeLimits) QueryTimeout(string) time.Duration {
return f.queryTimeout
}

func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/queryrange/split_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

func Test_RangeVectorSplit(t *testing.T) {
srm := NewSplitByRangeMiddleware(log.NewNopLogger(), fakeLimits{
maxSeries: 10000,
maxSeries: 10000,
queryTimeout: time.Second,
splits: map[string]time.Duration{
"tenant": time.Minute,
},
Expand Down
Loading