Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logql/parallel binop #5317

Merged
merged 7 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type Expr interface {
fmt.Stringer
}

func Clone(e Expr) (Expr, error) {
return ParseExpr(e.String())
}

type QueryParams interface {
LogSelector() (LogSelectorExpr, error)
GetStart() time.Time
Expand Down
36 changes: 26 additions & 10 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -547,16 +548,31 @@ func binOpStepEvaluator(
)
}

// we have two non literal legs
lse, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
rse, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
var lse, rse StepEvaluator
g, ctx := errgroup.WithContext(ctx)

// We have two non literal legs,
// load them in parallel
g.Go(func() error {
var err error
lse, err = ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
return err
})
g.Go(func() error {
var err error
rse, err = ev.StepEvaluator(ctx, ev, expr.RHS, q)
return err
})

// ensure both sides are loaded before returning the combined evaluator
if err := g.Wait(); err != nil {
return nil, err
}

// keep a scoped reference to err as it's referenced in the Error()
// implementation of this StepEvaluator
var scopedErr error

return newStepEvaluator(func() (bool, int64, promql.Vector) {
var (
ts int64
Expand Down Expand Up @@ -593,7 +609,7 @@ func binOpStepEvaluator(
case OpTypeUnless:
results = vectorUnless(lhs, rhs, lsigs, rsigs)
default:
results, err = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs)
results, scopedErr = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs)
}
return true, ts, results
}, func() (lastError error) {
Expand All @@ -605,8 +621,8 @@ func binOpStepEvaluator(
return lastError
}, func() error {
var errs []error
if err != nil {
errs = append(errs, err)
if scopedErr != nil {
errs = append(errs, scopedErr)
}
for _, ev := range []StepEvaluator{lse, rse} {
if err := ev.Error(); err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) {
return false, nil, err
}

mappedStr := mapped.String()
originalStr := parsed.String()
mappedStr := mapped.String()
noop = originalStr == mappedStr
if noop {
m.metrics.parsed.WithLabelValues(NoopKey).Inc()
Expand All @@ -126,6 +126,12 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) {
}

func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := Clone(expr)
if err != nil {
return nil, err
}

switch e := expr.(type) {
case *LiteralExpr:
return e, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
DefaultDownstreamConcurrency = 32
DefaultDownstreamConcurrency = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Failing to understand why it got increased (even after reading below comment on Downstreamer())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can line it up with the max_query_parallelism ? Because 128 is half the biggest tenant we have at 256 so he will get limited by this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or may be cap it higher ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was historically used to limit how many downstream queries (query-frontend -> querier) could be dispatched in parallel by a single time splitted logql query. Now that this part is controlled by the LimitedRoundTripper instead, we only want to use the Downstreamer concurrency to prevent us from creating unbounded goroutines. Increasing the limit to 128 still seems reasonable to that effect but is also high enough to not pre-limit anything the LimitedRoundTripper would limit anyway. Basically, this is a crude attempt to prevent us from blowing up goroutines due to malicious queries without introducing a bottleneck in our query path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying Owen. 👍

Two things:

  1. Can we add this
 Increasing the limit to 128 still seems reasonable to that effect but is also high enough to not pre-limit anything the LimitedRoundTripper would limit anyway. 

as a comment to the const itself?

  1. Also +1 to have this value same as max_query_parallelism. Because having max_query_parallelism as 256 will be allowed happily by LimitedRoundTripper but still can get limited by this Downstreamer.?

Copy link
Member Author

@owen-d owen-d Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can line it up with the max_query_parallelism ? Because 128 is half the biggest tenant we have at 256 so he will get limited by this.

Yes, we could, but this is also post-splitted code, meaning that each split would need to schedule more than 128 queries for this to limit it. We may ultimately want to thread it into the MaxQueryParallelism code, but I felt that was overengineering for the moment and could be done in another PR if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good !

)

type DownstreamHandler struct {
Expand Down Expand Up @@ -48,6 +48,12 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas
}
}

// Note: After the introduction of the LimitedRoundTripper,
// bounding concurrency in the downstreamer is mostly redundant
// The reason we don't remove it is to prevent malicious queries
// from creating an unreasonably large number of goroutines, such as
// the case of a query like `a / a / a / a / a ..etc`, which could try
// to shard each leg, quickly dispatching an unreasonable number of goroutines.
func (h DownstreamHandler) Downstreamer() logql.Downstreamer {
p := DefaultDownstreamConcurrency
locks := make(chan struct{}, p)
Expand Down