Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add filter transform #125

Merged
merged 3 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,31 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`.
After the first transform, we have the keys `dstAddr` and `srcAddr`.
After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`.

### Transform Filter

The filter transform module allows setting rules to remove complete entries from
the output, or just remove specific keys and values from entries.

For example, suppose we have a flow log with the following syntax:
```
{"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832}
```

The below configuration will remove (filter) the entry from the output

```yaml
pipeline:
transform:
- type: filter
filter:
rules:
- input: SrcPort
type: remove_entry_if_exists
```
Using `remove_entry_if_doesnt_exist` in the rule reverses the logic and will not remove the above example entry
Using `remove_field` in the rule `type` instead, results in outputting the entry after
removal of only the `SrcPort` key and value

### Transform Network

`transform network` provides specific functionality that is useful for transformation of network flow-logs:
Expand Down
12 changes: 12 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ Following is the supported API format for generic transformations:
input: entry input field
output: entry output field
</pre>
## Transform Filter API
Following is the supported API format for filter transformations:

<pre>
filter:
rules: list of filter rules, each includes:
input: entry input field
type: (enum) one of the following:
remove_field: removes the field from the entry
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field doesnt exist
</pre>
## Transform Network API
Following is the supported API format for network transformations:

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type API struct {
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
Expand Down
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type enums struct {
PromEncodeOperationEnum PromEncodeOperationEnum
TransformNetworkOperationEnum TransformNetworkOperationEnum
TransformFilterOperationEnum TransformFilterOperationEnum
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules" doc:"list of filter rules, each includes:"`
}

type TransformFilterOperationEnum struct {
RemoveField string `yaml:"remove_field" doc:"removes the field from the entry"`
RemoveEntryIfExists string `yaml:"remove_entry_if_exists" doc:"removes the entry if the field exists"`
RemoveEntryIfDoesntExist string `yaml:"remove_entry_if_doesnt_exist" doc:"removes the entry if the field doesnt exist"`
}

func TransformFilterOperationName(operation string) string {
return GetEnumName(TransformFilterOperationEnum{}, operation)
}

type TransformFilterRule struct {
Input string `yaml:"input" doc:"entry input field"`
Type string `yaml:"type" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Decode struct {
type Transform struct {
Type string
Generic api.TransformGeneric
Filter api.TransformFilter
Network api.TransformNetwork
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{},
case StageTransform:
stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) {
for i := range in {
out <- transform.ExecuteTransform(pe.Transformer, i)
out <- pe.Transformer.Transform(i)
}
})
case StageExtract:
Expand Down Expand Up @@ -260,6 +260,8 @@ func getTransformer(params config.StageParam) (transform.Transformer, error) {
switch params.Transform.Type {
case transform.OperationGeneric:
transformer, err = transform.NewTransformGeneric(params)
case transform.OperationFilter:
transformer, err = transform.NewTransformFilter(params)
case transform.OperationNetwork:
transformer, err = transform.NewTransformNetwork(params)
case transform.OperationNone:
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ parameters:

func Test_transformToLoki(t *testing.T) {
var transformed []config.GenericMap
input := config.GenericMap{"key": "value"}
input := []config.GenericMap{{"key": "value"}}
transform, err := transform.NewTransformNone()
require.NoError(t, err)
transformed = append(transformed, transform.Transform(input))
transformed = append(transformed, transform.Transform(input)...)

v := test.InitConfig(t, yamlConfigNoParams)
require.NotNil(t, v)
Expand Down
15 changes: 3 additions & 12 deletions pkg/pipeline/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
)

type Transformer interface {
Transform(in config.GenericMap) config.GenericMap
Transform(in []config.GenericMap) []config.GenericMap
}

type transformNone struct {
}

// Transform transforms a flow before being stored
func (t *transformNone) Transform(f config.GenericMap) config.GenericMap {
func (t *transformNone) Transform(f []config.GenericMap) []config.GenericMap {
return f
}

Expand All @@ -52,15 +52,6 @@ type Definitions []Definition
const (
OperationGeneric = "generic"
OperationNetwork = "network"
OperationFilter = "filter"
OperationNone = "none"
)

func ExecuteTransform(transformer Transformer, in []config.GenericMap) []config.GenericMap {
out := make([]config.GenericMap, 0)
var flowEntry config.GenericMap
for _, entry := range in {
flowEntry = transformer.Transform(entry)
out = append(out, flowEntry)
}
return out
}
69 changes: 69 additions & 0 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

type Filter struct {
Rules []api.TransformFilterRule
}

// Transform transforms a flow
func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap {
log.Debugf("f = %v", f)
output := make([]config.GenericMap, 0)
for _, entry := range input {
outputEntry := entry
addToOutput := true
for _, rule := range f.Rules {
log.Debugf("rule = %v", rule)
switch rule.Type {
case api.TransformFilterOperationName("RemoveField"):
delete(outputEntry, rule.Input)
case api.TransformFilterOperationName("RemoveEntryIfExists"):
if _, ok := entry[rule.Input]; ok {
addToOutput = false
}
case api.TransformFilterOperationName("RemoveEntryIfDoesntExist"):
if _, ok := entry[rule.Input]; !ok {
addToOutput = false
}
default:
log.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
}
if addToOutput {
output = append(output, outputEntry)
log.Debugf("Transform.GenericMap = %v", outputEntry)
}
}
return output
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
log.Debugf("entering NewTransformFilter")
transformFilter := &Filter{
Rules: params.Transform.Filter.Rules,
}
return transformFilter, nil
}
124 changes: 124 additions & 0 deletions pkg/pipeline/transform/transform_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testConfigTransformFilterRemoveField = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: dstPort
type: remove_field
- input: srcPort
type: remove_field
`

const testConfigTransformFilterRemoveEntryIfExists = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: srcPort
type: remove_entry_if_exists
`

const testConfigTransformFilterRemoveEntryIfDoesntExists = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: doesntSrcPort
type: remove_entry_if_doesnt_exist
`

func getFilterExpectedOutput() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0.1",
"8888IP": "8.8.8.8",
"emptyIP": "",
"level": "error",
"protocol": "tcp",
"protocol_num": 6,
"value": 7.0,
"message": "test message",
"dstIP": "20.0.0.2",
}
}

func TestNewTransformFilterRemoveField(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveField)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 2)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform([]config.GenericMap{input})
expectedOutput := getFilterExpectedOutput()
require.Equal(t, expectedOutput, output[0])
}

func TestNewTransformFilterRemoveEntryIfExists(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveEntryIfExists)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 1)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform([]config.GenericMap{input})
require.Equal(t, output, []config.GenericMap{})
}

eranra marked this conversation as resolved.
Show resolved Hide resolved
func TestNewTransformFilterRemoveEntryIfDoesntExists(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilterRemoveEntryIfDoesntExists)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 1)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform([]config.GenericMap{input})
require.Equal(t, output, []config.GenericMap{})
}
func InitNewTransformFilter(t *testing.T, configFile string) Transformer {
v := test.InitConfig(t, configFile)
require.NotNil(t, v)

config := config.Parameters[0]
newTransform, err := NewTransformFilter(config)
require.NoError(t, err)
return newTransform
}
Loading