Skip to content

Commit

Permalink
add filter transform
Browse files Browse the repository at this point in the history
  • Loading branch information
eranra committed Mar 7, 2022
1 parent 39e66f8 commit c221d28
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 42 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,31 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`.
After the first transform, we have the keys `dstAddr` and `srcAddr`.
After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`.

### Transform Filter

The filter transform module allows setting rules to remove complete entries from
the output, or just remove specific keys and values from entries.

For example, suppose we have a flow log with the following syntax:
```
{"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832}
```

The bellow configuration will remove (filter) the entry from the output

```yaml
pipeline:
transform:
- type: filter
filter:
rules:
- input: SrcPort
type: remove_entry_if_exists
```
Using `remove_entry_if_doesnt_exist` in the rule reverses the logic and will not remove the above example entry
Using `remove_field` in the rule `type` instead, cause in outputting the entry after
removal of only the `SrcPort` key and value

### Transform Network

`transform network` provides specific functionality that is useful for transformation of network flow-logs:
Expand Down
12 changes: 12 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ Following is the supported API format for generic transformations:
input: entry input field
output: entry output field
</pre>
## Transform Filter API
Following is the supported API format for filter transformations:

<pre>
filter:
rules: list of filter rules, each includes:
input: entry input field
type: (enum) one of the following:
remove_field: removes the field from the entry
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field doesnt exist
</pre>
## Transform Network API
Following is the supported API format for network transformations:

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type API struct {
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
Expand Down
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type enums struct {
PromEncodeOperationEnum PromEncodeOperationEnum
TransformNetworkOperationEnum TransformNetworkOperationEnum
TransformFilterOperationEnum TransformFilterOperationEnum
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules" doc:"list of filter rules, each includes:"`
}

type TransformFilterOperationEnum struct {
RemoveField string `yaml:"remove_field" doc:"removes the field from the entry"`
RemoveEntryIfExists string `yaml:"remove_entry_if_exists" doc:"removes the entry if the field exists"`
RemoveEntryIfDoesntExist string `yaml:"remove_entry_if_doesnt_exist" doc:"removes the entry if the field doesnt exist"`
}

func TransformFilterOperationName(operation string) string {
return GetEnumName(TransformFilterOperationEnum{}, operation)
}

type TransformFilterRule struct {
Input string `yaml:"input" doc:"entry input field"`
Type string `yaml:"type" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Decode struct {
type Transform struct {
Type string
Generic api.TransformGeneric
Filter api.TransformFilter
Network api.TransformNetwork
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (b *builder) getStageNode(pe *pipelineEntry) (interface{}, error) {
case StageTransform:
stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) {
for i := range in {
out <- transform.ExecuteTransform(pe.Transformer, i)
out <- pe.Transformer.Transform(i)
}
})
case StageExtract:
Expand Down Expand Up @@ -263,6 +263,8 @@ func getTransformer(params config.StageParam) (transform.Transformer, error) {
switch params.Transform.Type {
case transform.OperationGeneric:
transformer, err = transform.NewTransformGeneric(params)
case transform.OperationFilter:
transformer, err = transform.NewTransformFilter(params)
case transform.OperationNetwork:
transformer, err = transform.NewTransformNetwork(params)
case transform.OperationNone:
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ parameters:

func Test_transformToLoki(t *testing.T) {
var transformed []config.GenericMap
input := config.GenericMap{"key": "value"}
input := []config.GenericMap{{"key": "value"}}
transform, err := transform.NewTransformNone()
require.NoError(t, err)
transformed = append(transformed, transform.Transform(input))
transformed = append(transformed, transform.Transform(input)...)

v := test.InitConfig(t, yamlConfigNoParams)
require.NotNil(t, v)
Expand Down
15 changes: 3 additions & 12 deletions pkg/pipeline/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
)

type Transformer interface {
Transform(in config.GenericMap) config.GenericMap
Transform(in []config.GenericMap) []config.GenericMap
}

type transformNone struct {
}

// Transform transforms a flow before being stored
func (t *transformNone) Transform(f config.GenericMap) config.GenericMap {
func (t *transformNone) Transform(f []config.GenericMap) []config.GenericMap {
return f
}

Expand All @@ -52,15 +52,6 @@ type Definitions []Definition
const (
OperationGeneric = "generic"
OperationNetwork = "network"
OperationFilter = "filter"
OperationNone = "none"
)

func ExecuteTransform(transformer Transformer, in []config.GenericMap) []config.GenericMap {
out := make([]config.GenericMap, 0)
var flowEntry config.GenericMap
for _, entry := range in {
flowEntry = transformer.Transform(entry)
out = append(out, flowEntry)
}
return out
}
69 changes: 69 additions & 0 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

type Filter struct {
Rules []api.TransformFilterRule
}

// Transform transforms a flow
func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap {
log.Debugf("f = %v", f)
output := make([]config.GenericMap, 0)
for _, entry := range input {
outputEntry := entry
addToOutput := true
for _, rule := range f.Rules {
log.Debugf("rule = %v", rule)
switch rule.Type {
case api.TransformFilterOperationName("RemoveField"):
delete(outputEntry, rule.Input)
case api.TransformFilterOperationName("RemoveEntryIfExists"):
if _, ok := entry[rule.Input]; ok {
addToOutput = false
}
case api.TransformFilterOperationName("RemoveEntryIfDoesntExist"):
if _, ok := entry[rule.Input]; !ok {
addToOutput = false
}
default:
log.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
}
if addToOutput {
output = append(output, outputEntry)
log.Debugf("Transform.GenericMap = %v", outputEntry)
}
}
return output
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
log.Debugf("entering NewTransformFilter")
transformFilter := &Filter{
Rules: params.Transform.Filter.Rules,
}
return transformFilter, nil
}
101 changes: 101 additions & 0 deletions pkg/pipeline/transform/transform_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testConfigTransformFilterRemoveField = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: dstPort
type: remove_field
- input: srcPort
type: remove_field
`

const testConfigTransformFilterRemoveEntryIfExists = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: srcPort
type: remove_entry_if_exists
`

func getFilterExpectedOutput() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0.1",
"8888IP": "8.8.8.8",
"emptyIP": "",
"level": "error",
"protocol": "tcp",
"protocol_num": 6,
"value": 7.0,
"message": "test message",
"dstIP": "20.0.0.2",
}
}

func TestNewTransformFilterRemoveField(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveField)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 2)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform([]config.GenericMap{input})
expectedOutput := getFilterExpectedOutput()
require.Equal(t, expectedOutput, output[0])
}

func TestNewTransformFilterRemoveEntryIfExists(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveEntryIfExists)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 1)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform([]config.GenericMap{input})
require.Equal(t, output, []config.GenericMap{})
}

func InitNewTransformFilter(t *testing.T, configFile string) Transformer {
v := test.InitConfig(t, configFile)
require.NotNil(t, v)

config := config.Parameters[0]
newTransform, err := NewTransformFilter(config)
require.NoError(t, err)
return newTransform
}
20 changes: 12 additions & 8 deletions pkg/pipeline/transform/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ type Generic struct {
}

// Transform transforms a flow to a new set of keys
func (g *Generic) Transform(f config.GenericMap) config.GenericMap {
log.Debugf("f = %v", f)
gm := make(config.GenericMap)
for _, transformRule := range g.Rules {
log.Debugf("transformRule = %v", transformRule)
gm[transformRule.Output] = f[transformRule.Input]
func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap {
log.Debugf("f = %v", g)
output := make([]config.GenericMap, 0)
for _, entry := range input {
outputEntry := make(config.GenericMap)
for _, transformRule := range g.Rules {
log.Debugf("transformRule = %v", transformRule)
outputEntry[transformRule.Output] = entry[transformRule.Input]
}
log.Debugf("Transform.GenericMap = %v", outputEntry)
output = append(output, outputEntry)
}
log.Debugf("Transform.GenericMap = %v", gm)
return gm
return output
}

// NewTransformGeneric create a new transform
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/transform/transform_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func TestNewTransformGeneric(t *testing.T) {
require.Len(t, transformGeneric.Rules, 6)

input := test.GetIngestMockEntry(false)
output := transformGeneric.Transform(input)
output := transformGeneric.Transform([]config.GenericMap{input})
expectedOutput := getGenericExpectedOutput()
require.Equal(t, output, expectedOutput)
require.Equal(t, expectedOutput, output[0])
}

func InitNewTransformGeneric(t *testing.T, configFile string) Transformer {
Expand Down
Loading

0 comments on commit c221d28

Please sign in to comment.