Skip to content

Commit

Permalink
implemented multiplier in transform-generic (#345)
Browse files Browse the repository at this point in the history
* implemented multiplier in transform-generic

* moved multiplier operation to separate function

* added what happens if multipler is 0
  • Loading branch information
KalmanMeth committed Dec 19, 2022
1 parent a88f7d6 commit 4835cc5
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 7 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ Specify `policy: replace_keys` to use only the newly specified keys.
To include the original keys and values in addition to those specified in the `rules`,
specify `policy: preserve_original_keys`.

The rule `multiplier` takes the input field, multiplies it by the provided value, and
places the result in the output field.
This is useful to use when provided with only a sample of the flow logs (e.g. 1 our of 20),
and some of the variables need to be adjusted accordingly.
If `multipier` is not set or if it is set to 0, then the input field is simply copied to the output field.

For example, suppose we have a flow log with the following syntax:
```
{"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832}
Expand All @@ -273,12 +279,14 @@ parameters:
rules:
- input: Bytes
output: bytes
multiplier: 20
- input: DstAddr
output: dstAddr
- input: DstPort
output: dstPort
- input: Packets
output: packets
multiplier: 20
- input: SrcAddr
output: srcAddr
- input: SrcPort
Expand All @@ -291,6 +299,11 @@ Each field specified by `input` is translated into a field specified by the corr
Only those specified fields are saved for further processing in the pipeline.
Further stages in the pipeline should use these new field names.
This mechanism allows us to translate from any flow-log layout to a standard set of field names.

In the above example, the `bytes` and `packets` fields have a multiplier of 20.
This may be done in case only a sampling of the flow logs are provided, in this case 1 in 20,
so that these fields need to be scaled accordingly.

If the `input` and `output` fields are identical, then that field is simply passed to the next stage.
For example:
```yaml
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Following is the supported API format for generic transformations:
rules: list of transform rules, each includes:
input: entry input field
output: entry output field
multiplier: scaling factor to compenstate for sampling
</pre>
## Transform Filter API
Following is the supported API format for filter transformations:
Expand Down
2 changes: 2 additions & 0 deletions network_definitions/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ transform:
output: proto
- input: Bytes
output: bytes
multiplier: 1
- input: TCPFlags
output: TCPFlags
- input: SrcAS
Expand All @@ -31,6 +32,7 @@ transform:
output: dstAS
- input: Packets
output: packets
multiplier: 1
- input: TimeReceived
output: timeReceived
extract:
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func TransformGenericOperationName(operation string) string {
}

type GenericTransformRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Multiplier int `yaml:"multiplier,omitempty" json:"multiplier,omitempty" doc:"scaling factor to compenstate for sampling"`
}

type GenericTransform []GenericTransformRule
45 changes: 42 additions & 3 deletions pkg/pipeline/transform/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,56 @@ type Generic struct {
// Transform transforms a flow to a new set of keys
func (g *Generic) Transform(entry config.GenericMap) (config.GenericMap, bool) {
var outputEntry config.GenericMap
log.Tracef("Transform input = %v", entry)
ok := true
glog.Tracef("Transform input = %v", entry)
if g.policy != "replace_keys" {
outputEntry = entry.Copy()
} else {
outputEntry = config.GenericMap{}
}
for _, transformRule := range g.rules {
outputEntry[transformRule.Output] = entry[transformRule.Input]
if transformRule.Multiplier != 0 {
ok = g.performMultiplier(entry, transformRule, outputEntry)
} else {
outputEntry[transformRule.Output] = entry[transformRule.Input]
}
}
glog.Tracef("Transform output = %v", outputEntry)
return outputEntry, true
return outputEntry, ok
}

func (g *Generic) performMultiplier(entry config.GenericMap, transformRule api.GenericTransformRule, outputEntry config.GenericMap) bool {
ok := true
switch entry[transformRule.Input].(type) {
case int:
outputEntry[transformRule.Output] = transformRule.Multiplier * entry[transformRule.Input].(int)
case uint:
outputEntry[transformRule.Output] = uint(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint)
case int8:
outputEntry[transformRule.Output] = int8(transformRule.Multiplier) * outputEntry[transformRule.Input].(int8)
case uint8:
outputEntry[transformRule.Output] = uint8(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint8)
case int16:
outputEntry[transformRule.Output] = int16(transformRule.Multiplier) * outputEntry[transformRule.Input].(int16)
case uint16:
outputEntry[transformRule.Output] = uint16(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint16)
case int32:
outputEntry[transformRule.Output] = int32(transformRule.Multiplier) * outputEntry[transformRule.Input].(int32)
case uint32:
outputEntry[transformRule.Output] = uint32(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint32)
case int64:
outputEntry[transformRule.Output] = int64(transformRule.Multiplier) * outputEntry[transformRule.Input].(int64)
case uint64:
outputEntry[transformRule.Output] = uint64(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint64)
case float32:
outputEntry[transformRule.Output] = float32(transformRule.Multiplier) * entry[transformRule.Input].(float32)
case float64:
outputEntry[transformRule.Output] = float64(transformRule.Multiplier) * entry[transformRule.Input].(float64)
default:
ok = false
glog.Errorf("%s not of numerical type; cannot perform multiplication", transformRule.Output)
}
return ok
}

// NewTransformGeneric create a new transform
Expand Down
96 changes: 94 additions & 2 deletions pkg/pipeline/transform/transform_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package transform
import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -135,8 +136,99 @@ func InitNewTransformGeneric(t *testing.T, configFile string) Transformer {
v, cfg := test.InitConfig(t, configFile)
require.NotNil(t, v)

config := cfg.Parameters[0]
newTransform, err := NewTransformGeneric(config)
configParams := cfg.Parameters[0]
newTransform, err := NewTransformGeneric(configParams)
require.NoError(t, err)
return newTransform
}

func Test_Transform_Multiplier(t *testing.T) {
newGenericTransform := Generic{
policy: "preserve_original_keys",
rules: []api.GenericTransformRule{
{
Input: "input_var",
Output: "output_var",
Multiplier: 10,
},
},
}

var entry config.GenericMap
entry = config.GenericMap{
"input_var": 3,
"other_var": 7,
}
output, ok := newGenericTransform.Transform(entry)
require.True(t, ok)
require.Equal(t, 30, output["output_var"])
require.Equal(t, 7, output["other_var"])

entry = config.GenericMap{
"input_var": 4.0,
}
output, ok = newGenericTransform.Transform(entry)
require.True(t, ok)
require.Equal(t, 40.0, output["output_var"])

entry = config.GenericMap{
"input_var": "not_a_number",
}
_, ok = newGenericTransform.Transform(entry)
require.False(t, ok)

entry = config.GenericMap{
"input_var": true,
}
_, ok = newGenericTransform.Transform(entry)
require.False(t, ok)

entry = config.GenericMap{
"input_var": -4.0,
}
output, ok = newGenericTransform.Transform(entry)
require.True(t, ok)
require.Equal(t, -40.0, output["output_var"])

entry = config.GenericMap{
"input_var": uint16(5),
}
output, ok = newGenericTransform.Transform(entry)
require.True(t, ok)
require.Equal(t, uint16(50), output["output_var"])

var goodConfig = []byte(`
parameters:
- name: transform1
transform:
type: generic
generic:
policy: preserve_original_keys
rules:
- input: bytes
output: bytes
multiplier: 3
`)
v, cfg := test.InitConfig(t, string(goodConfig))
require.NotNil(t, v)

configParams := cfg.Parameters[0]
_, err := NewTransformGeneric(configParams)
require.NoError(t, err)

var badConfig = []byte(`
parameters:
- name: transform1
transform:
type: generic
generic:
policy: preserve_original_keys
rules:
- input: bytes
output: bytes
multiplier: "not_a_number"
`)
v, cfg = test.InitConfig(t, string(badConfig))
require.Nil(t, v)
require.Nil(t, cfg)
}

0 comments on commit 4835cc5

Please sign in to comment.