Skip to content

Commit

Permalink
Logql comparison ops (#2182)
Browse files Browse the repository at this point in the history
* comparison binops

* starts comparison operator docs

* implements bool modifier for comparison binops

* vector-vector binop merging cases, including bool modifier

* logql docs formatting

* resets protos

* linter fixes

* goimports local

* gofmt
  • Loading branch information
owen-d authored Jun 5, 2020
1 parent 365235b commit 1530120
Show file tree
Hide file tree
Showing 21 changed files with 1,067 additions and 268 deletions.
1 change: 1 addition & 0 deletions cmd/fluent-bit/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/go-kit/kit/log"

"github.com/grafana/loki/pkg/promtail/client"
)

Expand Down
1 change: 1 addition & 0 deletions cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/go-kit/kit/log"

"github.com/grafana/loki/pkg/promtail/client"
)

Expand Down
17 changes: 11 additions & 6 deletions cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
kvPairFormat
)

const (
falseStr = "false"
trueStr = "true"
)

type config struct {
clientConfig client.Config
bufferConfig bufferConfig
Expand Down Expand Up @@ -112,9 +117,9 @@ func parseConfig(cfg ConfigGetter) (*config, error) {

autoKubernetesLabels := cfg.Get("AutoKubernetesLabels")
switch autoKubernetesLabels {
case "false", "":
case falseStr, "":
res.autoKubernetesLabels = false
case "true":
case trueStr:
res.autoKubernetesLabels = true
default:
return nil, fmt.Errorf("invalid boolean AutoKubernetesLabels: %v", autoKubernetesLabels)
Expand All @@ -132,9 +137,9 @@ func parseConfig(cfg ConfigGetter) (*config, error) {

dropSingleKey := cfg.Get("DropSingleKey")
switch dropSingleKey {
case "false":
case falseStr:
res.dropSingleKey = false
case "true", "":
case trueStr, "":
res.dropSingleKey = true
default:
return nil, fmt.Errorf("invalid boolean DropSingleKey: %v", dropSingleKey)
Expand Down Expand Up @@ -165,9 +170,9 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
// enable loki plugin buffering
buffer := cfg.Get("Buffer")
switch buffer {
case "false", "":
case falseStr, "":
res.bufferConfig.buffer = false
case "true":
case trueStr:
res.bufferConfig.buffer = true
default:
return nil, fmt.Errorf("invalid boolean Buffer: %v", buffer)
Expand Down
6 changes: 3 additions & 3 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"

"github.com/grafana/loki/pkg/promtail/client"
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/promtail/client"
)

type dqueConfig struct {
Expand Down Expand Up @@ -67,7 +67,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
}

if !cfg.bufferConfig.dqueConfig.queueSync {
q.queue.TurboOn()
_ = q.queue.TurboOn()
}

q.loki, err = client.New(cfg.clientConfig, logger)
Expand Down
27 changes: 27 additions & 0 deletions docs/logql.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,30 @@ These logical/set binary operators are only defined between two vectors:
This contrived query will return the intersection of these queries, effectively `rate({app="bar"})`

> `rate({app=~"foo|bar"}[1m]) and rate({app="bar"}[1m])`
#### Comparison operators

- `==` (equality)
- `!=` (inequality)
- `>` (greater than)
- `>=` (greater than or equal to)
- `<` (less than)
- `<=` (less than or equal to)

Comparison operators are defined between scalar/scalar, vector/scalar, and vector/vector value pairs. By default they filter. Their behavior can be modified by providing bool after the operator, which will return 0 or 1 for the value rather than filtering.

Between two scalars, the bool modifier must be provided and these operators result in another scalar that is either 0 (false) or 1 (true), depending on the comparison result.

> `1 >= 1` is equivalent to `1`
Between a vector and a scalar, these operators are applied to the value of every data sample in the vector, and vector elements between which the comparison result is false get dropped from the result vector. If the bool modifier is provided, vector elements that would be dropped instead have the value 0 and vector elements that would be kept have the value 1.

> `count_over_time({foo="bar"}[1m]) > 10` Filters the streams which log at elast 10 lines in the last minute.
> `count_over_time({foo="bar"}[1m]) > bool 10` The same as above, but instead of filtering, attached the value 0 to streams that log less than 10 lines.
Between two vectors, these operators behave as a filter by default, applied to matching entries. Vector elements for which the expression is not true or which do not find a match on the other side of the expression get dropped from the result, while the others are propagated into a result vector. If the bool modifier is provided, vector elements that would have been dropped instead have the value 0 and vector elements that would be kept have the value 1, with the grouping labels again becoming the output label set.

> `sum without(app) (count_over_time({app="foo"}[1m])) > sum without(app) (count_over_time({app="bar"}[1m]))` Returns the streams matching `app=foo` without app labels that have higher counts within the last minute than their counterparts matching `app=bar`without app labels.
> `sum without(app) (count_over_time({app="foo"}[1m])) > bool sum without(app) (count_over_time({app="bar"}[1m]))` The same as above, but vectors have their values set to 1 if they pass the comparison or 0 if they fail/would otherwise have been filtered out.
41 changes: 37 additions & 4 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,33 @@ const (
OpTypeDiv = "/"
OpTypeMod = "%"
OpTypePow = "^"

// binops - comparison
OpTypeCmpEQ = "=="
OpTypeNEQ = "!="
OpTypeGT = ">"
OpTypeGTE = ">="
OpTypeLT = "<"
OpTypeLTE = "<="
)

func IsComparisonOperator(op string) bool {
switch op {
case OpTypeCmpEQ, OpTypeNEQ, OpTypeGT, OpTypeGTE, OpTypeLT, OpTypeLTE:
return true
default:
return false
}
}

// IsLogicalBinOp tests whether an operation is a logical/set binary operation
func IsLogicalBinOp(op string) bool {
return op == OpTypeOr || op == OpTypeAnd || op == OpTypeUnless
switch op {
case OpTypeOr, OpTypeAnd, OpTypeUnless:
return true
default:
return false
}
}

// SampleExpr is a LogQL expression filtering logs and returning metric samples.
Expand Down Expand Up @@ -341,13 +363,21 @@ func (e *vectorAggregationExpr) Operations() []string {
return append(e.left.Operations(), e.operation)
}

type BinOpOptions struct {
ReturnBool bool
}

type binOpExpr struct {
SampleExpr
RHS SampleExpr
op string
RHS SampleExpr
op string
opts BinOpOptions
}

func (e *binOpExpr) String() string {
if e.opts.ReturnBool {
return fmt.Sprintf("%s %s bool %s", e.SampleExpr.String(), e.op, e.RHS.String())
}
return fmt.Sprintf("%s %s %s", e.SampleExpr.String(), e.op, e.RHS.String())
}

Expand All @@ -357,7 +387,7 @@ func (e *binOpExpr) Operations() []string {
return append(ops, e.op)
}

func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
func mustNewBinOpExpr(op string, opts BinOpOptions, lhs, rhs Expr) SampleExpr {
left, ok := lhs.(SampleExpr)
if !ok {
panic(newParseError(fmt.Sprintf(
Expand Down Expand Up @@ -406,6 +436,7 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
SampleExpr: left,
RHS: right,
op: op,
opts: opts,
}
}

Expand All @@ -417,6 +448,8 @@ func reduceBinOp(op string, left, right *literalExpr) *literalExpr {
op,
&promql.Sample{Point: promql.Point{V: left.value}},
&promql.Sample{Point: promql.Point{V: right.value}},
false,
false,
)
return &literalExpr{value: merged.V}
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,41 @@ func Test_FilterMatcher(t *testing.T) {
}
}

func TestStringer(t *testing.T) {
for _, tc := range []struct {
in string
out string
}{
{
in: `1 > 1 > 1`,
out: `0.000000`,
},
{
in: `1 > 1 > bool 1`,
out: `0.000000`,
},
{
in: `1 > bool 1 > count_over_time({foo="bar"}[1m])`,
out: `0.000000 > count_over_time(({foo="bar"})[1m])`,
},
{
in: `1 > bool 1 > bool count_over_time({foo="bar"}[1m])`,
out: `0.000000 > bool count_over_time(({foo="bar"})[1m])`,
},
{

in: `0.000000 > count_over_time(({foo="bar"})[1m])`,
out: `0.000000 > count_over_time(({foo="bar"})[1m])`,
},
} {
t.Run(tc.in, func(t *testing.T) {
expr, err := ParseExpr(tc.in)
require.Nil(t, err)
require.Equal(t, tc.out, expr.String())
})
}
}

func BenchmarkContainsFilter(b *testing.B) {
expr, err := ParseLogSelector(`{app="foo"} |= "foo"`)
if err != nil {
Expand Down
Loading

0 comments on commit 1530120

Please sign in to comment.