diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index a02844a7e674..9279fc392c18 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -339,10 +339,8 @@ func (e *LineFilterExpr) String() string { } func (e *LineFilterExpr) Filter() (log.Filterer, error) { - acc := make([]log.Filterer, 0) for curr := e; curr != nil; curr = curr.Left { - switch curr.Op { case OpFilterIP: var err error @@ -964,6 +962,18 @@ func (e *VectorAggregationExpr) String() string { // impl SampleExpr func (e *VectorAggregationExpr) Shardable() bool { + if e.Operation == OpTypeCount || e.Operation == OpTypeAvg { + // count is shardable is labels are not mutated + // otherwise distinct values can be counted twice per shard + shardable := true + e.Walk(func(e interface{}) { + switch e.(type) { + case *LabelParserExpr, LabelFmtExpr: + shardable = false + } + }) + return shardable + } return shardableOps[e.Operation] && e.Left.Shardable() } diff --git a/pkg/logql/ast_test.go b/pkg/logql/ast_test.go index 1f38522e4249..1e9620e4f94f 100644 --- a/pkg/logql/ast_test.go +++ b/pkg/logql/ast_test.go @@ -394,7 +394,6 @@ func BenchmarkContainsFilter(b *testing.B) { } }) } - } func Test_parserExpr_Parser(t *testing.T) { diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index c9ed351b144b..5ca275147570 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -159,6 +159,14 @@ func TestMappingStrings(t *testing.T) { in: `rate({foo="bar"} | json | label_format foo=bar [5m])`, out: `rate({foo="bar"} | json | label_format foo=bar [5m])`, }, + { + in: `count(rate({foo="bar"} | json [5m]))`, + out: `count(downstream ++ downstream)`, + }, + { + in: `avg(rate({foo="bar"} | json [5m]))`, + out: `avg(downstream ++ downstream)`, + }, { in: `{foo="bar"} |= "id=123"`, out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`,