From b13cc79ce8655e136e4f4a29efd9a39f6d601e90 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 3 Oct 2019 10:14:06 -0400 Subject: [PATCH 1/3] Add logql filter to match stages and drop capability --- docs/clients/promtail/pipelines.md | 15 ++-- docs/clients/promtail/stages/match.md | 32 ++++--- pkg/logentry/stages/match.go | 47 ++++++++--- pkg/logentry/stages/match_test.go | 115 ++++++++++++++++++++------ pkg/logentry/stages/pipeline.go | 6 ++ pkg/logentry/stages/pipeline_test.go | 62 +++++++++++++- 6 files changed, 222 insertions(+), 55 deletions(-) diff --git a/docs/clients/promtail/pipelines.md b/docs/clients/promtail/pipelines.md index 72bd64883bfe..e4794e28e9ba 100644 --- a/docs/clients/promtail/pipelines.md +++ b/docs/clients/promtail/pipelines.md @@ -18,7 +18,7 @@ stages: 2. Change the timestamp of the log line 3. Change the content of the log line 4. Create a metric based on the extracted data -4. **Filtering stages** optionally apply a subset of stages based on some +4. **Filtering stages** optionally apply a subset of stages or drop entries based on some condition. Typical pipelines will start with a parsing stage (such as a @@ -28,7 +28,7 @@ something with that extract data. The most common action stage will be a [labels](./stages/labels.md) stage to turn extracted data into a label. A common stage will also be the [match](./stages/match.md) stage to selectively -apply stages based on the current labels. +apply stages or drop entries based on a [LogQL stream selector and filter expressions](../../logql.md). Note that pipelines can not currently be used to deduplicate logs; Loki will receive the same log line multiple times if, for example: @@ -76,9 +76,9 @@ scrape_configs: source: timestamp # This stage is only going to run if the scraped target has a label of - # "name" with a value of "nginx". + # "name" with a value of "nginx" and if the log line contains the word "GET" - match: - selector: '{name="nginx"}' + selector: '{name="nginx"} |= "GET"' stages: # This regex stage extracts a new output by matching against some # values and capturing the rest. @@ -126,10 +126,10 @@ scrape_configs: level: component: - # This stage will only run if the scraped target has a label of "app" - # and a value of "some-app". + # This stage will only run if the scraped target has a label "app" + # with a value of "some-app" and the log line doesn't contains the word "info" - match: - selector: '{app="some-app"}' + selector: '{app="some-app"} != "info"' stages: # The regex stage tries to extract a Go panic by looking for panic: # in the log message. @@ -207,4 +207,3 @@ Action stages: Filtering stages: * [match](./stages/match.md): Conditionally run stages based on the label set. - diff --git a/docs/clients/promtail/stages/match.md b/docs/clients/promtail/stages/match.md index 5a931b01c294..091a35b0af20 100644 --- a/docs/clients/promtail/stages/match.md +++ b/docs/clients/promtail/stages/match.md @@ -1,20 +1,24 @@ # `match` stage The match stage is a filtering stage that conditionally applies a set of stages -when a log entry matches a configurable [LogQL](../../../logql.md) stream -selector. +or drop entries when a log entry matches a configurable [LogQL](../../../logql.md) +stream selector and filter expressions. ## Schema ```yaml match: - # LogQL stream selector. + # LogQL stream selector and filter expressions. selector: # Names the pipeline. When defined, creates an additional label in # the pipeline_duration_seconds histogram, where the value is # concatenated with job_name using an underscore. - [pipieline_name: ] + [pipeline_name: ] + + # When set to true (default to false), all entries matching the selector will + # be dropped. Stages must not be defined when dropping entries. + [drop_entries: ] # Nested set of pipeline stages only if the selector # matches the labels of the log entries: @@ -46,33 +50,38 @@ pipeline_stages: - labels: app: - match: - selector: "{app=\"loki\"}" + selector: '{app="loki"}' stages: - json: expressions: msg: message - match: pipeline_name: "app2" - selector: "{app=\"pokey\"}" + selector: '{app="pokey"}' stages: - json: expressions: msg: msg +- match: + selector: '{app="promtail"} |~ ".*noisy error.*"' + drop_entries: true - output: source: msg ``` -And the given log line: +And given log lines: -``` +```json { "time":"2012-11-01T22:08:41+00:00", "app":"loki", "component": ["parser","type"], "level" : "WARN", "message" : "app1 log line" } +{ "time":"2012-11-01T22:08:41+00:00", "app":"promtail", "component": ["parser","type"], "level" : "ERROR", "message" : "foo noisy error" } ``` -The first stage will add `app` with a value of `loki` into the extracted map, +The first stage will add `app` with a value of `loki` into the extracted map for the first log line, while the second stage will add `app` as a label (again with the value of `loki`). +The second line will follow the same flow and will be added the label `app` with a value of `promtail`. The third stage uses LogQL to only execute the nested stages when there is a -label of `app` whose value is `loki`. This matches in our case; the nested +label of `app` whose value is `loki`. This matches the first line in our case; the nested `json` stage then adds `msg` into the extracted map with a value of `app1 log line`. @@ -80,6 +89,9 @@ The fourth stage uses LogQL to only executed the nested stages when there is a label of `app` whose value is `pokey`. This does **not** match in our case, so the nested `json` stage is not ran. +The fifth stage will drop any entries from the application `promtail` that matches +the regex `.*noisy error`. + The final `output` stage changes the contents of the log line to be the value of `msg` from the extracted map. In this case, the log line is changed to `app1 log line`. diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index d59c6d704b83..837550d181a3 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -3,12 +3,13 @@ package stages import ( "time" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/go-kit/kit/log" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/logql" ) @@ -19,6 +20,7 @@ const ( ErrSelectorRequired = "selector statement required for match stage" ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'" ErrSelectorSyntax = "invalid selector syntax for match stage" + ErrStagesWithDropLine = "match stage configured to drop entries cannot contains stages" ) // MatcherConfig contains the configuration for a matcherStage @@ -26,10 +28,11 @@ type MatcherConfig struct { PipelineName *string `mapstructure:"pipeline_name"` Selector string `mapstructure:"selector"` Stages PipelineStages `mapstructure:"stages"` + DropEntries bool `mapstructure:"drop_entries"` } // validateMatcherConfig validates the MatcherConfig for the matcherStage -func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { +func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) { if cfg == nil { return nil, errors.New(ErrEmptyMatchStageConfig) } @@ -39,14 +42,18 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { if cfg.Selector == "" { return nil, errors.New(ErrSelectorRequired) } - if cfg.Stages == nil || len(cfg.Stages) == 0 { + if !cfg.DropEntries && (cfg.Stages == nil || len(cfg.Stages) == 0) { return nil, errors.New(ErrMatchRequiresStages) } - matchers, err := logql.ParseMatchers(cfg.Selector) + if cfg.DropEntries && (cfg.Stages != nil && len(cfg.Stages) != 0) { + return nil, errors.New(ErrStagesWithDropLine) + } + + selector, err := logql.ParseLogSelector(cfg.Selector) if err != nil { return nil, errors.Wrap(err, ErrSelectorSyntax) } - return matchers, nil + return selector, nil } // newMatcherStage creates a new matcherStage from config @@ -56,7 +63,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg if err != nil { return nil, err } - matchers, err := validateMatcherConfig(cfg) + selector, err := validateMatcherConfig(cfg) if err != nil { return nil, err } @@ -67,21 +74,34 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg nPtr = &name } - pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer) + var pl *Pipeline + if !cfg.DropEntries { + var err error + pl, err = NewPipeline(logger, cfg.Stages, nPtr, registerer) + if err != nil { + return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config) + } + } + + filter, err := selector.Filter() if err != nil { - return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config) + return nil, errors.Wrap(err, "error parsing filter") } return &matcherStage{ - matchers: matchers, + matchers: selector.Matchers(), pipeline: pl, + drop: cfg.DropEntries, + filter: filter, }, nil } // matcherStage applies Label matchers to determine if the include stages should be run type matcherStage struct { matchers []*labels.Matcher + filter logql.Filter pipeline Stage + drop bool } // Process implements Stage @@ -91,7 +111,14 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter return } } - m.pipeline.Process(labels, extracted, t, entry) + if m.filter == nil || m.filter([]byte(*entry)) { + // Adds the drop label to not be sent by the api.EntryHandler + if m.drop { + labels[dropLabel] = "true" + return + } + m.pipeline.Process(labels, extracted, t, entry) + } } // Name implements Stage diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index 9849d064309f..c6432095013a 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -99,43 +99,72 @@ func TestMatchPipeline(t *testing.T) { func TestMatcher(t *testing.T) { t.Parallel() tests := []struct { - matcher string - labels map[string]string + selector string + labels map[string]string + drop bool - shouldRun bool - wantErr bool + shouldDrop bool + shouldRun bool + wantErr bool }{ - {"{foo=\"bar\"} |= \"foo\"", map[string]string{"foo": "bar"}, false, true}, - {"{foo=\"bar\"} |~ \"foo\"", map[string]string{"foo": "bar"}, false, true}, - {"foo", map[string]string{"foo": "bar"}, false, true}, - {"{}", map[string]string{"foo": "bar"}, false, true}, - {"{", map[string]string{"foo": "bar"}, false, true}, - {"", map[string]string{"foo": "bar"}, true, true}, - {"{foo=\"bar\"}", map[string]string{"foo": "bar"}, true, false}, - {"{foo=\"\"}", map[string]string{"foo": "bar"}, false, false}, - {"{foo=\"\"}", map[string]string{}, true, false}, - {"{foo!=\"bar\"}", map[string]string{"foo": "bar"}, false, false}, - {"{foo=\"bar\",bar!=\"test\"}", map[string]string{"foo": "bar"}, true, false}, - {"{foo=\"bar\",bar!=\"test\"}", map[string]string{"foo": "bar", "bar": "test"}, false, false}, - {"{foo=\"bar\",bar=~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, true, false}, - {"{foo=\"bar\",bar!~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, false, false}, - {"{foo=\"\"}", map[string]string{}, true, false}, + {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, false, false, false, false}, + {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, false, false, false, false}, + {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, false, false, false, false}, + {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, true, true, false, false}, + {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, true, true, false, false}, + {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, true, false, false, false}, + {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, true, false, false, false}, + {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, true, true, false, false}, + {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, true, true, false, false}, + {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, true, false, false, false}, + {`{foo="bar"} !~ "[]"`, map[string]string{"foo": "bar"}, false, false, false, true}, + {"foo", map[string]string{"foo": "bar"}, false, false, false, true}, + {"{}", map[string]string{"foo": "bar"}, false, false, false, true}, + {"{", map[string]string{"foo": "bar"}, false, false, false, true}, + {"", map[string]string{"foo": "bar"}, false, false, true, true}, + {`{foo="bar"}`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo=""}`, map[string]string{"foo": "bar"}, false, false, false, false}, + {`{foo=""}`, map[string]string{}, false, false, true, false}, + {`{foo!="bar"}`, map[string]string{"foo": "bar"}, false, false, false, false}, + {`{foo!="bar"}`, map[string]string{"foo": "bar"}, true, false, false, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, false, false, true, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, true, true, false, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, false, false}, + {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, true, true, false, false}, + {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, true, false}, + {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, false, false}, + {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, true, false, false, false}, + + {`{foo=""}`, map[string]string{}, false, false, true, false}, } for _, tt := range tests { - t.Run(fmt.Sprintf("%s/%s", tt.matcher, tt.labels), func(t *testing.T) { + name := fmt.Sprintf("%s/%s", tt.selector, tt.labels) + if tt.drop { + name += "_drop" + } + t.Run(name, func(t *testing.T) { // Build a match config which has a simple label stage that when matched will add the test_label to // the labels in the pipeline. - matchConfig := MatcherConfig{ - nil, - tt.matcher, - PipelineStages{ + var stages PipelineStages + if !tt.drop { + stages = PipelineStages{ PipelineStage{ StageTypeLabel: LabelsConfig{ "test_label": nil, }, }, - }, + } + } + matchConfig := MatcherConfig{ + nil, + tt.selector, + stages, + tt.drop, } s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer) if (err != nil) != tt.wantErr { @@ -143,7 +172,7 @@ func TestMatcher(t *testing.T) { return } if s != nil { - ts, entry := time.Now(), "" + ts, entry := time.Now(), "foo" extracted := map[string]interface{}{ "test_label": "unimportant value", } @@ -156,6 +185,40 @@ func TestMatcher(t *testing.T) { t.Error("stage ran but should have not") } } + if tt.shouldDrop { + if _, ok := labels[dropLabel]; !ok { + t.Error("stage should have been dropped") + } + } + } + }) + } +} + +func Test_validateMatcherConfig(t *testing.T) { + empty := "" + notempty := "test" + tests := []struct { + name string + cfg *MatcherConfig + wantErr bool + }{ + {"empty", nil, true}, + {"pipeline name required", &MatcherConfig{PipelineName: &empty}, true}, + {"selector required", &MatcherConfig{PipelineName: ¬empty, Selector: ""}, true}, + {"nil stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: nil}, true}, + {"empty stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: []interface{}{}}, true}, + {"stages with dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: true, Stages: []interface{}{""}}, true}, + {"empty stages dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: true, Stages: []interface{}{}}, false}, + {"stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: []interface{}{""}}, false}, + {"bad selector", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo}`, DropEntries: false, Stages: []interface{}{""}}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := validateMatcherConfig(tt.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("validateMatcherConfig() error = %v, wantErr %v", err, tt.wantErr) + return } }) } diff --git a/pkg/logentry/stages/pipeline.go b/pkg/logentry/stages/pipeline.go index 608bdb3c7fd1..592877acf10c 100644 --- a/pkg/logentry/stages/pipeline.go +++ b/pkg/logentry/stages/pipeline.go @@ -12,6 +12,8 @@ import ( "github.com/grafana/loki/pkg/promtail/api" ) +const dropLabel = "__drop__" + // PipelineStages contains configuration for each stage within a pipeline type PipelineStages = []interface{} @@ -102,6 +104,10 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error { extracted := map[string]interface{}{} p.Process(labels, extracted, ×tamp, &line) + // if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler + if _, ok := labels[dropLabel]; ok { + return nil + } return next.Handle(labels, timestamp, line) }) } diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index 7ecf8719b3c0..610ebcdab36c 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -11,7 +11,6 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" ) @@ -168,3 +167,64 @@ func BenchmarkPipeline(b *testing.B) { }) } } + +type stubHandler struct { + bool +} + +func (s *stubHandler) Handle(labels model.LabelSet, time time.Time, entry string) error { + s.bool = true + return nil +} + +func TestPipeline_Wrap(t *testing.T) { + now := time.Now() + var config map[string]interface{} + err := yaml.Unmarshal([]byte(testYaml), &config) + if err != nil { + panic(err) + } + p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer) + if err != nil { + panic(err) + } + + tests := map[string]struct { + labels model.LabelSet + shouldSend bool + }{ + "should drop": { + map[model.LabelName]model.LabelValue{ + "__drop__": "true", + "stream": "stderr", + "action": "GET", + "status_code": "200", + }, + false, + }, + "should send": { + map[model.LabelName]model.LabelValue{ + "stream": "stderr", + "action": "GET", + "status_code": "200", + }, + true, + }, + } + + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + t.Parallel() + extracted := map[string]interface{}{} + p.Process(tt.labels, extracted, &now, &rawTestLine) + stub := &stubHandler{} + handler := p.Wrap(stub) + if err := handler.Handle(tt.labels, now, rawTestLine); err != nil { + t.Fatalf("failed to handle entry: %v", err) + } + assert.Equal(t, stub.bool, tt.shouldSend) + + }) + } +} From a2215e0885fe08ce5d1c09ca1f95d17d5c541b87 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 3 Oct 2019 10:20:32 -0400 Subject: [PATCH 2/3] use const string instead and remove unused value --- pkg/logentry/stages/match.go | 2 +- pkg/logentry/stages/pipeline_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index 837550d181a3..b1dcb0e73ebf 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -114,7 +114,7 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter if m.filter == nil || m.filter([]byte(*entry)) { // Adds the drop label to not be sent by the api.EntryHandler if m.drop { - labels[dropLabel] = "true" + labels[dropLabel] = "" return } m.pipeline.Process(labels, extracted, t, entry) diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index 610ebcdab36c..026feeaaf822 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -195,7 +195,7 @@ func TestPipeline_Wrap(t *testing.T) { }{ "should drop": { map[model.LabelName]model.LabelValue{ - "__drop__": "true", + dropLabel: "true", "stream": "stderr", "action": "GET", "status_code": "200", From 5c2c75a765d260560d4a0d774c3b2a4d17954765 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 7 Oct 2019 10:19:55 -0400 Subject: [PATCH 3/3] Uses action property instead of drop_entries --- docs/clients/promtail/stages/match.md | 7 ++- pkg/logentry/stages/match.go | 32 +++++++--- pkg/logentry/stages/match_test.go | 89 +++++++++++++-------------- 3 files changed, 70 insertions(+), 58 deletions(-) diff --git a/docs/clients/promtail/stages/match.md b/docs/clients/promtail/stages/match.md index 091a35b0af20..9744fb70806d 100644 --- a/docs/clients/promtail/stages/match.md +++ b/docs/clients/promtail/stages/match.md @@ -16,9 +16,9 @@ match: # concatenated with job_name using an underscore. [pipeline_name: ] - # When set to true (default to false), all entries matching the selector will + # When set to drop (default to keep), all entries matching the selector will # be dropped. Stages must not be defined when dropping entries. - [drop_entries: ] + [action: ] # Nested set of pipeline stages only if the selector # matches the labels of the log entries: @@ -58,13 +58,14 @@ pipeline_stages: - match: pipeline_name: "app2" selector: '{app="pokey"}' + action: keep stages: - json: expressions: msg: msg - match: selector: '{app="promtail"} |~ ".*noisy error.*"' - drop_entries: true + action: drop - output: source: msg ``` diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index b1dcb0e73ebf..0c0838afd0b7 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -21,6 +21,9 @@ const ( ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'" ErrSelectorSyntax = "invalid selector syntax for match stage" ErrStagesWithDropLine = "match stage configured to drop entries cannot contains stages" + ErrUnknownMatchAction = "match stage action should be 'keep' or 'drop'" + MatchActionKeep = "keep" + MatchActionDrop = "drop" ) // MatcherConfig contains the configuration for a matcherStage @@ -28,7 +31,7 @@ type MatcherConfig struct { PipelineName *string `mapstructure:"pipeline_name"` Selector string `mapstructure:"selector"` Stages PipelineStages `mapstructure:"stages"` - DropEntries bool `mapstructure:"drop_entries"` + Action string `mapstructure:"action"` } // validateMatcherConfig validates the MatcherConfig for the matcherStage @@ -42,10 +45,18 @@ func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) { if cfg.Selector == "" { return nil, errors.New(ErrSelectorRequired) } - if !cfg.DropEntries && (cfg.Stages == nil || len(cfg.Stages) == 0) { + switch cfg.Action { + case MatchActionKeep, MatchActionDrop: + case "": + cfg.Action = MatchActionKeep + default: + return nil, errors.New(ErrUnknownMatchAction) + } + + if cfg.Action == MatchActionKeep && (cfg.Stages == nil || len(cfg.Stages) == 0) { return nil, errors.New(ErrMatchRequiresStages) } - if cfg.DropEntries && (cfg.Stages != nil && len(cfg.Stages) != 0) { + if cfg.Action == MatchActionDrop && (cfg.Stages != nil && len(cfg.Stages) != 0) { return nil, errors.New(ErrStagesWithDropLine) } @@ -75,7 +86,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg } var pl *Pipeline - if !cfg.DropEntries { + if cfg.Action == MatchActionKeep { var err error pl, err = NewPipeline(logger, cfg.Stages, nPtr, registerer) if err != nil { @@ -91,7 +102,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg return &matcherStage{ matchers: selector.Matchers(), pipeline: pl, - drop: cfg.DropEntries, + action: cfg.Action, filter: filter, }, nil } @@ -101,7 +112,7 @@ type matcherStage struct { matchers []*labels.Matcher filter logql.Filter pipeline Stage - drop bool + action string } // Process implements Stage @@ -112,12 +123,13 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter } } if m.filter == nil || m.filter([]byte(*entry)) { - // Adds the drop label to not be sent by the api.EntryHandler - if m.drop { + switch m.action { + case MatchActionDrop: + // Adds the drop label to not be sent by the api.EntryHandler labels[dropLabel] = "" - return + case MatchActionKeep: + m.pipeline.Process(labels, extracted, t, entry) } - m.pipeline.Process(labels, extracted, t, entry) } } diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index c6432095013a..843e2b8dd24a 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -101,57 +101,55 @@ func TestMatcher(t *testing.T) { tests := []struct { selector string labels map[string]string - drop bool + action string shouldDrop bool shouldRun bool wantErr bool }{ - {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, false, false, false, false}, - {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, false, false, false, false}, - {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, false, false, false, false}, - {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, true, true, false, false}, - {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, true, true, false, false}, - {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, true, false, false, false}, - {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, true, false, false, false}, - {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, true, true, false, false}, - {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, true, true, false, false}, - {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, true, false, false, false}, - {`{foo="bar"} !~ "[]"`, map[string]string{"foo": "bar"}, false, false, false, true}, - {"foo", map[string]string{"foo": "bar"}, false, false, false, true}, - {"{}", map[string]string{"foo": "bar"}, false, false, false, true}, - {"{", map[string]string{"foo": "bar"}, false, false, false, true}, - {"", map[string]string{"foo": "bar"}, false, false, true, true}, - {`{foo="bar"}`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo=""}`, map[string]string{"foo": "bar"}, false, false, false, false}, - {`{foo=""}`, map[string]string{}, false, false, true, false}, - {`{foo!="bar"}`, map[string]string{"foo": "bar"}, false, false, false, false}, - {`{foo!="bar"}`, map[string]string{"foo": "bar"}, true, false, false, false}, - {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, false, false, true, false}, - {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, true, true, false, false}, - {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, false, false}, - {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, true, true, false, false}, - {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, true, false}, - {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, false, false, false, false}, - {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, true, false, false, false}, + {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, false, false}, + {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, false, false}, + {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, MatchActionKeep, false, false, false}, + {`{foo="bar"} |= "foo"`, map[string]string{"foo": "bar"}, MatchActionDrop, true, false, false}, + {`{foo="bar"} |~ "foo"`, map[string]string{"foo": "bar"}, MatchActionDrop, true, false, false}, + {`{foo="bar"} |= "bar"`, map[string]string{"foo": "bar"}, MatchActionDrop, false, false, false}, + {`{foo="bar"} |~ "bar"`, map[string]string{"foo": "bar"}, MatchActionDrop, false, false, false}, + {`{foo="bar"} != "bar"`, map[string]string{"foo": "bar"}, MatchActionDrop, true, false, false}, + {`{foo="bar"} !~ "bar"`, map[string]string{"foo": "bar"}, MatchActionDrop, true, false, false}, + {`{foo="bar"} != "foo"`, map[string]string{"foo": "bar"}, MatchActionDrop, false, false, false}, + {`{foo="bar"} !~ "[]"`, map[string]string{"foo": "bar"}, MatchActionDrop, false, false, true}, + {"foo", map[string]string{"foo": "bar"}, MatchActionKeep, false, false, true}, + {"{}", map[string]string{"foo": "bar"}, MatchActionKeep, false, false, true}, + {"{", map[string]string{"foo": "bar"}, MatchActionKeep, false, false, true}, + {"", map[string]string{"foo": "bar"}, MatchActionKeep, false, true, true}, + {`{foo="bar"}`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo=""}`, map[string]string{"foo": "bar"}, MatchActionKeep, false, false, false}, + {`{foo=""}`, map[string]string{}, MatchActionKeep, false, true, false}, + {`{foo!="bar"}`, map[string]string{"foo": "bar"}, MatchActionKeep, false, false, false}, + {`{foo!="bar"}`, map[string]string{"foo": "bar"}, MatchActionDrop, false, false, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, MatchActionKeep, false, true, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar"}, MatchActionDrop, true, false, false}, + {`{foo="bar",bar!="test"}`, map[string]string{"foo": "bar", "bar": "test"}, MatchActionKeep, false, false, false}, + {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, MatchActionDrop, true, false, false}, + {`{foo="bar",bar=~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, MatchActionKeep, false, true, false}, + {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, MatchActionKeep, false, false, false}, + {`{foo="bar",bar!~"te.*"}`, map[string]string{"foo": "bar", "bar": "test"}, MatchActionDrop, false, false, false}, - {`{foo=""}`, map[string]string{}, false, false, true, false}, + {`{foo=""}`, map[string]string{}, MatchActionKeep, false, true, false}, } for _, tt := range tests { - name := fmt.Sprintf("%s/%s", tt.selector, tt.labels) - if tt.drop { - name += "_drop" - } + name := fmt.Sprintf("%s/%s/%s", tt.selector, tt.labels, tt.action) + t.Run(name, func(t *testing.T) { // Build a match config which has a simple label stage that when matched will add the test_label to // the labels in the pipeline. var stages PipelineStages - if !tt.drop { + if tt.action != MatchActionDrop { stages = PipelineStages{ PipelineStage{ StageTypeLabel: LabelsConfig{ @@ -164,7 +162,7 @@ func TestMatcher(t *testing.T) { nil, tt.selector, stages, - tt.drop, + tt.action, } s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer) if (err != nil) != tt.wantErr { @@ -206,12 +204,13 @@ func Test_validateMatcherConfig(t *testing.T) { {"empty", nil, true}, {"pipeline name required", &MatcherConfig{PipelineName: &empty}, true}, {"selector required", &MatcherConfig{PipelineName: ¬empty, Selector: ""}, true}, - {"nil stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: nil}, true}, - {"empty stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: []interface{}{}}, true}, - {"stages with dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: true, Stages: []interface{}{""}}, true}, - {"empty stages dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: true, Stages: []interface{}{}}, false}, - {"stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, DropEntries: false, Stages: []interface{}{""}}, false}, - {"bad selector", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo}`, DropEntries: false, Stages: []interface{}{""}}, true}, + {"nil stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: nil}, true}, + {"empty stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: []interface{}{}}, true}, + {"stages with dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: []interface{}{""}}, true}, + {"empty stages dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: []interface{}{}}, false}, + {"stages without dropping", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: []interface{}{""}}, false}, + {"bad selector", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo}`, Action: MatchActionKeep, Stages: []interface{}{""}}, true}, + {"bad action", &MatcherConfig{PipelineName: ¬empty, Selector: `{app="foo}`, Action: "nope", Stages: []interface{}{""}}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {