Skip to content

Commit

Permalink
Disable sharding of count/avg when labels are mutated (#5474)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 25, 2022
1 parent 6dad54f commit a411bb9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
14 changes: 12 additions & 2 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,8 @@ func (e *LineFilterExpr) String() string {
}

func (e *LineFilterExpr) Filter() (log.Filterer, error) {

acc := make([]log.Filterer, 0)
for curr := e; curr != nil; curr = curr.Left {

switch curr.Op {
case OpFilterIP:
var err error
Expand Down Expand Up @@ -964,6 +962,18 @@ func (e *VectorAggregationExpr) String() string {

// impl SampleExpr
func (e *VectorAggregationExpr) Shardable() bool {
if e.Operation == OpTypeCount || e.Operation == OpTypeAvg {
// count is shardable is labels are not mutated
// otherwise distinct values can be counted twice per shard
shardable := true
e.Walk(func(e interface{}) {
switch e.(type) {
case *LabelParserExpr, LabelFmtExpr:
shardable = false
}
})
return shardable
}
return shardableOps[e.Operation] && e.Left.Shardable()
}

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ func BenchmarkContainsFilter(b *testing.B) {
}
})
}

}

func Test_parserExpr_Parser(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ func TestMappingStrings(t *testing.T) {
in: `rate({foo="bar"} | json | label_format foo=bar [5m])`,
out: `rate({foo="bar"} | json | label_format foo=bar [5m])`,
},
{
in: `count(rate({foo="bar"} | json [5m]))`,
out: `count(downstream<rate({foo="bar"} | json [5m]), shard=0_of_2> ++ downstream<rate({foo="bar"} | json [5m]), shard=1_of_2>)`,
},
{
in: `avg(rate({foo="bar"} | json [5m]))`,
out: `avg(downstream<rate({foo="bar"} | json [5m]), shard=0_of_2> ++ downstream<rate({foo="bar"} | json [5m]), shard=1_of_2>)`,
},
{
in: `{foo="bar"} |= "id=123"`,
out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`,
Expand Down

0 comments on commit a411bb9

Please sign in to comment.