Skip to content

Commit

Permalink
add filter transform
Browse files Browse the repository at this point in the history
  • Loading branch information
eranra committed Mar 7, 2022
1 parent e0d95f2 commit 0f42064
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,26 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`.
After the first transform, we have the keys `dstAddr` and `srcAddr`.
After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`.

### Transform Filter

The filter transform module allows setting rules to removal of keys and values from entries.

For example, suppose we have a flow log with the following syntax:
```
{"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832}
```

The bellow configuration will remove (filter) the `SrcPort` key/value from the entry

```yaml
pipeline:
transform:
- type: filter
filter:
rules:
- input: SrcPort
```

### Transform Network

`transform network` provides specific functionality that is useful for transformation of network flow-logs:
Expand Down
8 changes: 8 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ Following is the supported API format for generic transformations:
input: entry input field
output: entry output field
</pre>
## Transform Filter API
Following is the supported API format for filter transformations:

<pre>
filter:
rules: list of filter rules, each includes:
input: entry input field
</pre>
## Transform Network API
Following is the supported API format for network transformations:

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type API struct {
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
Expand Down
26 changes: 26 additions & 0 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules" doc:"list of filter rules, each includes:"`
}

type TransformFilterRule struct {
Input string `yaml:"input" doc:"entry input field"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Decode struct {
type Transform struct {
Type string
Generic api.TransformGeneric
Filter api.TransformFilter
Network api.TransformNetwork
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

type Filter struct {
Rules []api.TransformFilterRule
}

// Transform transforms a flow to a new set of keys
func (f *Filter) Transform(input config.GenericMap) config.GenericMap {
log.Debugf("f = %v", f)
output := input
for _, transformRule := range f.Rules {
log.Debugf("transformRule = %v", transformRule)
delete(output, transformRule.Input)
}
log.Debugf("Transform.GenericMap = %v", output)
return output
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
log.Debugf("entering NewTransformFilter")
transformFilter := &Filter{
Rules: params.Transform.Filter.Rules,
}
return transformFilter, nil
}
75 changes: 75 additions & 0 deletions pkg/pipeline/transform/transform_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transform

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testConfigTransformFilter = `---
log-level: debug
pipeline:
- name: filter1
parameters:
- name: filter1
transform:
type: filter
filter:
rules:
- input: dstPort
- input: srcPort
`

func getFilterExpectedOutput() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0.1",
"8888IP": "8.8.8.8",
"emptyIP": "",
"level": "error",
"protocol": "tcp",
"protocol_num": 6,
"value": "7",
"message": "test message",
"dstIP": "20.0.0.2",
}
}

func TestNewTransformFilter(t *testing.T) {
newTransform := InitNewTransformFilter(t, testConfigTransformFilter)
transformFilter := newTransform.(*Filter)
require.Len(t, transformFilter.Rules, 2)

input := test.GetIngestMockEntry(false)
output := transformFilter.Transform(input)
expectedOutput := getFilterExpectedOutput()
require.Equal(t, output, expectedOutput)
}

func InitNewTransformFilter(t *testing.T, configFile string) Transformer {
v := test.InitConfig(t, configFile)
require.NotNil(t, v)

config := config.Parameters[0]
newTransform, err := NewTransformFilter(config)
require.NoError(t, err)
return newTransform
}

0 comments on commit 0f42064

Please sign in to comment.