diff --git a/README.md b/README.md index 3f3306cc4..2e1db2124 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,31 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`. After the first transform, we have the keys `dstAddr` and `srcAddr`. After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`. +### Transform Filter + +The filter transform module allows setting rules to remove complete entries from +the output, or just remove specific keys and values from entries. + +For example, suppose we have a flow log with the following syntax: +``` +{"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832} +``` + +The bellow configuration will remove (filter) the entry from the output + +```yaml +pipeline: + transform: + - type: filter + filter: + rules: + - input: SrcPort + type: remove_entry_if_exists +``` +Using `remove_entry_if_doesnt_exist` in the rule reverses the logic and will not remove the above example entry +Using `remove_field` in the rule `type` instead, cause in outputting the entry after +removal of only the `SrcPort` key and value + ### Transform Network `transform network` provides specific functionality that is useful for transformation of network flow-logs: diff --git a/docs/api.md b/docs/api.md index 7c70ee973..6acadf3bc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -71,6 +71,18 @@ Following is the supported API format for generic transformations: input: entry input field output: entry output field +## Transform Filter API +Following is the supported API format for filter transformations: + +
+ filter:
+         rules: list of filter rules, each includes:
+                 input: entry input field
+                 type: (enum) one of the following:
+                     remove_field: removes the field from the entry
+                     remove_entry_if_exists: removes the entry if the field exists
+                     remove_entry_if_doesnt_exist: removes the entry if the field doesnt exist
+
## Transform Network API Following is the supported API format for network transformations: diff --git a/pkg/api/api.go b/pkg/api/api.go index 36290d2b2..5ca8aa04b 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -30,6 +30,7 @@ type API struct { IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"` TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` + TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` diff --git a/pkg/api/enum.go b/pkg/api/enum.go index bf02e675d..fc1da03cc 100644 --- a/pkg/api/enum.go +++ b/pkg/api/enum.go @@ -25,6 +25,7 @@ import ( type enums struct { PromEncodeOperationEnum PromEncodeOperationEnum TransformNetworkOperationEnum TransformNetworkOperationEnum + TransformFilterOperationEnum TransformFilterOperationEnum KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum } diff --git a/pkg/api/transform_filter.go b/pkg/api/transform_filter.go new file mode 100644 index 000000000..849dfb27d --- /dev/null +++ b/pkg/api/transform_filter.go @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type TransformFilter struct { + Rules []TransformFilterRule `yaml:"rules" doc:"list of filter rules, each includes:"` +} + +type TransformFilterOperationEnum struct { + RemoveField string `yaml:"remove_field" doc:"removes the field from the entry"` + RemoveEntryIfExists string `yaml:"remove_entry_if_exists" doc:"removes the entry if the field exists"` + RemoveEntryIfDoesntExist string `yaml:"remove_entry_if_doesnt_exist" doc:"removes the entry if the field doesnt exist"` +} + +func TransformFilterOperationName(operation string) string { + return GetEnumName(TransformFilterOperationEnum{}, operation) +} + +type TransformFilterRule struct { + Input string `yaml:"input" doc:"entry input field"` + Type string `yaml:"type" enum:"TransformFilterOperationEnum" doc:"one of the following:"` +} diff --git a/pkg/config/config.go b/pkg/config/config.go index f7e148f6b..510f69352 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -82,6 +82,7 @@ type Decode struct { type Transform struct { Type string Generic api.TransformGeneric + Filter api.TransformFilter Network api.TransformNetwork } diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 16a76ccc9..dda5ae56a 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -193,7 +193,7 @@ func (b *builder) getStageNode(pe *pipelineEntry) (interface{}, error) { case StageTransform: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { for i := range in { - out <- transform.ExecuteTransform(pe.Transformer, i) + out <- pe.Transformer.Transform(i) } }) case StageExtract: @@ -263,6 +263,8 @@ func getTransformer(params config.StageParam) (transform.Transformer, error) { switch params.Transform.Type { case transform.OperationGeneric: transformer, err = transform.NewTransformGeneric(params) + case transform.OperationFilter: + transformer, err = transform.NewTransformFilter(params) case transform.OperationNetwork: transformer, err = transform.NewTransformNetwork(params) case transform.OperationNone: diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 406cf6843..664303d0d 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -44,10 +44,10 @@ parameters: func Test_transformToLoki(t *testing.T) { var transformed []config.GenericMap - input := config.GenericMap{"key": "value"} + input := []config.GenericMap{{"key": "value"}} transform, err := transform.NewTransformNone() require.NoError(t, err) - transformed = append(transformed, transform.Transform(input)) + transformed = append(transformed, transform.Transform(input)...) v := test.InitConfig(t, yamlConfigNoParams) require.NotNil(t, v) diff --git a/pkg/pipeline/transform/transform.go b/pkg/pipeline/transform/transform.go index a92725375..9ad20a1c6 100644 --- a/pkg/pipeline/transform/transform.go +++ b/pkg/pipeline/transform/transform.go @@ -24,14 +24,14 @@ import ( ) type Transformer interface { - Transform(in config.GenericMap) config.GenericMap + Transform(in []config.GenericMap) []config.GenericMap } type transformNone struct { } // Transform transforms a flow before being stored -func (t *transformNone) Transform(f config.GenericMap) config.GenericMap { +func (t *transformNone) Transform(f []config.GenericMap) []config.GenericMap { return f } @@ -52,15 +52,6 @@ type Definitions []Definition const ( OperationGeneric = "generic" OperationNetwork = "network" + OperationFilter = "filter" OperationNone = "none" ) - -func ExecuteTransform(transformer Transformer, in []config.GenericMap) []config.GenericMap { - out := make([]config.GenericMap, 0) - var flowEntry config.GenericMap - for _, entry := range in { - flowEntry = transformer.Transform(entry) - out = append(out, flowEntry) - } - return out -} diff --git a/pkg/pipeline/transform/transform_filter.go b/pkg/pipeline/transform/transform_filter.go new file mode 100644 index 000000000..b2dab2567 --- /dev/null +++ b/pkg/pipeline/transform/transform_filter.go @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transform + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +type Filter struct { + Rules []api.TransformFilterRule +} + +// Transform transforms a flow +func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap { + log.Debugf("f = %v", f) + output := make([]config.GenericMap, 0) + for _, entry := range input { + outputEntry := entry + addToOutput := true + for _, rule := range f.Rules { + log.Debugf("rule = %v", rule) + switch rule.Type { + case api.TransformFilterOperationName("RemoveField"): + delete(outputEntry, rule.Input) + case api.TransformFilterOperationName("RemoveEntryIfExists"): + if _, ok := entry[rule.Input]; ok { + addToOutput = false + } + case api.TransformFilterOperationName("RemoveEntryIfDoesntExist"): + if _, ok := entry[rule.Input]; !ok { + addToOutput = false + } + default: + log.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule) + } + } + if addToOutput { + output = append(output, outputEntry) + log.Debugf("Transform.GenericMap = %v", outputEntry) + } + } + return output +} + +// NewTransformFilter create a new filter transform +func NewTransformFilter(params config.StageParam) (Transformer, error) { + log.Debugf("entering NewTransformFilter") + transformFilter := &Filter{ + Rules: params.Transform.Filter.Rules, + } + return transformFilter, nil +} diff --git a/pkg/pipeline/transform/transform_filter_test.go b/pkg/pipeline/transform/transform_filter_test.go new file mode 100644 index 000000000..77c040687 --- /dev/null +++ b/pkg/pipeline/transform/transform_filter_test.go @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transform + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +const testConfigTransformFilterRemoveField = `--- +log-level: debug +pipeline: + - name: filter1 +parameters: + - name: filter1 + transform: + type: filter + filter: + rules: + - input: dstPort + type: remove_field + - input: srcPort + type: remove_field +` + +const testConfigTransformFilterRemoveEntryIfExists = `--- +log-level: debug +pipeline: + - name: filter1 +parameters: + - name: filter1 + transform: + type: filter + filter: + rules: + - input: srcPort + type: remove_entry_if_exists +` + +func getFilterExpectedOutput() config.GenericMap { + return config.GenericMap{ + "srcIP": "10.0.0.1", + "8888IP": "8.8.8.8", + "emptyIP": "", + "level": "error", + "protocol": "tcp", + "protocol_num": 6, + "value": 7.0, + "message": "test message", + "dstIP": "20.0.0.2", + } +} + +func TestNewTransformFilterRemoveField(t *testing.T) { + newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveField) + transformFilter := newTransform.(*Filter) + require.Len(t, transformFilter.Rules, 2) + + input := test.GetIngestMockEntry(false) + output := transformFilter.Transform([]config.GenericMap{input}) + expectedOutput := getFilterExpectedOutput() + require.Equal(t, expectedOutput, output[0]) +} + +func TestNewTransformFilterRemoveEntryIfExists(t *testing.T) { + newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveEntryIfExists) + transformFilter := newTransform.(*Filter) + require.Len(t, transformFilter.Rules, 1) + + input := test.GetIngestMockEntry(false) + output := transformFilter.Transform([]config.GenericMap{input}) + require.Equal(t, output, []config.GenericMap{}) +} + +func InitNewTransformFilter(t *testing.T, configFile string) Transformer { + v := test.InitConfig(t, configFile) + require.NotNil(t, v) + + config := config.Parameters[0] + newTransform, err := NewTransformFilter(config) + require.NoError(t, err) + return newTransform +} diff --git a/pkg/pipeline/transform/transform_generic.go b/pkg/pipeline/transform/transform_generic.go index a4ff23b49..8eb7659ce 100644 --- a/pkg/pipeline/transform/transform_generic.go +++ b/pkg/pipeline/transform/transform_generic.go @@ -28,15 +28,19 @@ type Generic struct { } // Transform transforms a flow to a new set of keys -func (g *Generic) Transform(f config.GenericMap) config.GenericMap { - log.Debugf("f = %v", f) - gm := make(config.GenericMap) - for _, transformRule := range g.Rules { - log.Debugf("transformRule = %v", transformRule) - gm[transformRule.Output] = f[transformRule.Input] +func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap { + log.Debugf("f = %v", g) + output := make([]config.GenericMap, 0) + for _, entry := range input { + outputEntry := make(config.GenericMap) + for _, transformRule := range g.Rules { + log.Debugf("transformRule = %v", transformRule) + outputEntry[transformRule.Output] = entry[transformRule.Input] + } + log.Debugf("Transform.GenericMap = %v", outputEntry) + output = append(output, outputEntry) } - log.Debugf("Transform.GenericMap = %v", gm) - return gm + return output } // NewTransformGeneric create a new transform diff --git a/pkg/pipeline/transform/transform_generic_test.go b/pkg/pipeline/transform/transform_generic_test.go index be53a678b..17a90a06c 100644 --- a/pkg/pipeline/transform/transform_generic_test.go +++ b/pkg/pipeline/transform/transform_generic_test.go @@ -66,9 +66,9 @@ func TestNewTransformGeneric(t *testing.T) { require.Len(t, transformGeneric.Rules, 6) input := test.GetIngestMockEntry(false) - output := transformGeneric.Transform(input) + output := transformGeneric.Transform([]config.GenericMap{input}) expectedOutput := getGenericExpectedOutput() - require.Equal(t, output, expectedOutput) + require.Equal(t, expectedOutput, output[0]) } func InitNewTransformGeneric(t *testing.T, configFile string) Transformer { diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 468281eca..dc6d15bf3 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -39,7 +39,16 @@ type Network struct { api.TransformNetwork } -func (n *Network) Transform(inputEntry config.GenericMap) config.GenericMap { +func (n *Network) Transform(inputEntries []config.GenericMap) []config.GenericMap { + outputEntries := make([]config.GenericMap, 0) + for _, entry := range inputEntries { + outputEntry := n.TransformEntry(entry) + outputEntries = append(outputEntries, outputEntry) + } + return outputEntries +} + +func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap { outputEntries := inputEntry for _, rule := range n.Rules { diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 31f090392..ad9ade089 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -134,9 +134,9 @@ func Test_Transform(t *testing.T) { err := location.InitLocationDB() require.NoError(t, err) - output := networkTransform.Transform(entry) + output := networkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, expectedOutput, output) + require.Equal(t, expectedOutput, output[0]) } func Test_TransformAddSubnetParseCIDRFailure(t *testing.T) { @@ -157,9 +157,9 @@ func Test_TransformAddSubnetParseCIDRFailure(t *testing.T) { err := location.InitLocationDB() require.NoError(t, err) - output := networkTransform.Transform(entry) + output := networkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, expectedOutput, output) + require.Equal(t, expectedOutput, output[0]) } func Test_NewTransformNetwork(t *testing.T) { @@ -187,10 +187,10 @@ parameters: require.NotNil(t, newNetworkTransform) entry := test.GetIngestMockEntry(false) - output := newNetworkTransform.Transform(entry) + output := newNetworkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, "10.0.0.1", output["srcIP"]) - require.Equal(t, "10.0.0.0/24", output["subnetSrcIP"]) + require.Equal(t, "10.0.0.1", output[0]["srcIP"]) + require.Equal(t, "10.0.0.0/24", output[0]["subnetSrcIP"]) } func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { @@ -228,13 +228,13 @@ parameters: // first time flow is new entry := test.GetIngestMockEntry(false) - output := newNetworkTransform.Transform(entry) - require.Equal(t, "777", output["isNewFlow"]) + output := newNetworkTransform.Transform([]config.GenericMap{entry}) + require.Equal(t, "777", output[0]["isNewFlow"]) // second time, same flow is not new entry = test.GetIngestMockEntry(false) - output = newNetworkTransform.Transform(entry) - require.Equal(t, nil, output["isNewFlow"]) + output = newNetworkTransform.Transform([]config.GenericMap{entry}) + require.Equal(t, nil, output[0]["isNewFlow"]) } func Test_TransformNetworkDependentRulesAddRegExIf(t *testing.T) { @@ -270,10 +270,10 @@ parameters: require.NotNil(t, newNetworkTransform) entry := test.GetIngestMockEntry(false) - output := newNetworkTransform.Transform(entry) + output := newNetworkTransform.Transform([]config.GenericMap{entry}) - require.Equal(t, "10.0.0.1", output["srcIP"]) - require.Equal(t, "10.0.0.0/24", output["subnetSrcIP"]) - require.Equal(t, "10.0.0.0/24", output["match-10.0.*"]) - require.NotEqual(t, "10.0.0.0/24", output["match-11.0.*"]) + require.Equal(t, "10.0.0.1", output[0]["srcIP"]) + require.Equal(t, "10.0.0.0/24", output[0]["subnetSrcIP"]) + require.Equal(t, "10.0.0.0/24", output[0]["match-10.0.*"]) + require.NotEqual(t, "10.0.0.0/24", output[0]["match-11.0.*"]) }