diff --git a/README.md b/README.md index 7161d09ab..085c3703b 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,12 @@ Specify `policy: replace_keys` to use only the newly specified keys. To include the original keys and values in addition to those specified in the `rules`, specify `policy: preserve_original_keys`. +The rule `multiplier` takes the input field, multiplies it by the provided value, and +places the result in the output field. +This is useful to use when provided with only a sample of the flow logs (e.g. 1 our of 20), +and some of the variables need to be adjusted accordingly. +If `multipier` is not set or if it is set to 0, then the input field is simply copied to the output field. + For example, suppose we have a flow log with the following syntax: ``` {"Bytes":20800,"DstAddr":"10.130.2.2","DstPort":36936,"Packets":400,"Proto":6,"SequenceNum":1919,"SrcAddr":"10.130.2.13","SrcHostIP":"10.0.197.206","SrcPort":3100,"TCPFlags":0,"TimeFlowStart":0,"TimeReceived":1637501832} @@ -273,12 +279,14 @@ parameters: rules: - input: Bytes output: bytes + multiplier: 20 - input: DstAddr output: dstAddr - input: DstPort output: dstPort - input: Packets output: packets + multiplier: 20 - input: SrcAddr output: srcAddr - input: SrcPort @@ -291,6 +299,11 @@ Each field specified by `input` is translated into a field specified by the corr Only those specified fields are saved for further processing in the pipeline. Further stages in the pipeline should use these new field names. This mechanism allows us to translate from any flow-log layout to a standard set of field names. + +In the above example, the `bytes` and `packets` fields have a multiplier of 20. +This may be done in case only a sampling of the flow logs are provided, in this case 1 in 20, +so that these fields need to be scaled accordingly. + If the `input` and `output` fields are identical, then that field is simply passed to the next stage. For example: ```yaml diff --git a/docs/api.md b/docs/api.md index 97176db41..b7b798cb3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -117,6 +117,7 @@ Following is the supported API format for generic transformations: rules: list of transform rules, each includes: input: entry input field output: entry output field + multiplier: scaling factor to compenstate for sampling ## Transform Filter API Following is the supported API format for filter transformations: diff --git a/network_definitions/config.yaml b/network_definitions/config.yaml index 252ff118e..1f99e4d76 100644 --- a/network_definitions/config.yaml +++ b/network_definitions/config.yaml @@ -23,6 +23,7 @@ transform: output: proto - input: Bytes output: bytes + multiplier: 1 - input: TCPFlags output: TCPFlags - input: SrcAS @@ -31,6 +32,7 @@ transform: output: dstAS - input: Packets output: packets + multiplier: 1 - input: TimeReceived output: timeReceived extract: diff --git a/pkg/api/transform_generic.go b/pkg/api/transform_generic.go index 3fbeb63e1..d03e46cc0 100644 --- a/pkg/api/transform_generic.go +++ b/pkg/api/transform_generic.go @@ -32,8 +32,9 @@ func TransformGenericOperationName(operation string) string { } type GenericTransformRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Multiplier int `yaml:"multiplier,omitempty" json:"multiplier,omitempty" doc:"scaling factor to compenstate for sampling"` } type GenericTransform []GenericTransformRule diff --git a/pkg/pipeline/transform/transform_generic.go b/pkg/pipeline/transform/transform_generic.go index 57765abb9..04f0131c4 100644 --- a/pkg/pipeline/transform/transform_generic.go +++ b/pkg/pipeline/transform/transform_generic.go @@ -33,17 +33,56 @@ type Generic struct { // Transform transforms a flow to a new set of keys func (g *Generic) Transform(entry config.GenericMap) (config.GenericMap, bool) { var outputEntry config.GenericMap - log.Tracef("Transform input = %v", entry) + ok := true + glog.Tracef("Transform input = %v", entry) if g.policy != "replace_keys" { outputEntry = entry.Copy() } else { outputEntry = config.GenericMap{} } for _, transformRule := range g.rules { - outputEntry[transformRule.Output] = entry[transformRule.Input] + if transformRule.Multiplier != 0 { + ok = g.performMultiplier(entry, transformRule, outputEntry) + } else { + outputEntry[transformRule.Output] = entry[transformRule.Input] + } } glog.Tracef("Transform output = %v", outputEntry) - return outputEntry, true + return outputEntry, ok +} + +func (g *Generic) performMultiplier(entry config.GenericMap, transformRule api.GenericTransformRule, outputEntry config.GenericMap) bool { + ok := true + switch entry[transformRule.Input].(type) { + case int: + outputEntry[transformRule.Output] = transformRule.Multiplier * entry[transformRule.Input].(int) + case uint: + outputEntry[transformRule.Output] = uint(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint) + case int8: + outputEntry[transformRule.Output] = int8(transformRule.Multiplier) * outputEntry[transformRule.Input].(int8) + case uint8: + outputEntry[transformRule.Output] = uint8(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint8) + case int16: + outputEntry[transformRule.Output] = int16(transformRule.Multiplier) * outputEntry[transformRule.Input].(int16) + case uint16: + outputEntry[transformRule.Output] = uint16(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint16) + case int32: + outputEntry[transformRule.Output] = int32(transformRule.Multiplier) * outputEntry[transformRule.Input].(int32) + case uint32: + outputEntry[transformRule.Output] = uint32(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint32) + case int64: + outputEntry[transformRule.Output] = int64(transformRule.Multiplier) * outputEntry[transformRule.Input].(int64) + case uint64: + outputEntry[transformRule.Output] = uint64(transformRule.Multiplier) * outputEntry[transformRule.Input].(uint64) + case float32: + outputEntry[transformRule.Output] = float32(transformRule.Multiplier) * entry[transformRule.Input].(float32) + case float64: + outputEntry[transformRule.Output] = float64(transformRule.Multiplier) * entry[transformRule.Input].(float64) + default: + ok = false + glog.Errorf("%s not of numerical type; cannot perform multiplication", transformRule.Output) + } + return ok } // NewTransformGeneric create a new transform diff --git a/pkg/pipeline/transform/transform_generic_test.go b/pkg/pipeline/transform/transform_generic_test.go index 85c1d1456..9a3fd5540 100644 --- a/pkg/pipeline/transform/transform_generic_test.go +++ b/pkg/pipeline/transform/transform_generic_test.go @@ -20,6 +20,7 @@ package transform import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" @@ -135,8 +136,99 @@ func InitNewTransformGeneric(t *testing.T, configFile string) Transformer { v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) - config := cfg.Parameters[0] - newTransform, err := NewTransformGeneric(config) + configParams := cfg.Parameters[0] + newTransform, err := NewTransformGeneric(configParams) require.NoError(t, err) return newTransform } + +func Test_Transform_Multiplier(t *testing.T) { + newGenericTransform := Generic{ + policy: "preserve_original_keys", + rules: []api.GenericTransformRule{ + { + Input: "input_var", + Output: "output_var", + Multiplier: 10, + }, + }, + } + + var entry config.GenericMap + entry = config.GenericMap{ + "input_var": 3, + "other_var": 7, + } + output, ok := newGenericTransform.Transform(entry) + require.True(t, ok) + require.Equal(t, 30, output["output_var"]) + require.Equal(t, 7, output["other_var"]) + + entry = config.GenericMap{ + "input_var": 4.0, + } + output, ok = newGenericTransform.Transform(entry) + require.True(t, ok) + require.Equal(t, 40.0, output["output_var"]) + + entry = config.GenericMap{ + "input_var": "not_a_number", + } + _, ok = newGenericTransform.Transform(entry) + require.False(t, ok) + + entry = config.GenericMap{ + "input_var": true, + } + _, ok = newGenericTransform.Transform(entry) + require.False(t, ok) + + entry = config.GenericMap{ + "input_var": -4.0, + } + output, ok = newGenericTransform.Transform(entry) + require.True(t, ok) + require.Equal(t, -40.0, output["output_var"]) + + entry = config.GenericMap{ + "input_var": uint16(5), + } + output, ok = newGenericTransform.Transform(entry) + require.True(t, ok) + require.Equal(t, uint16(50), output["output_var"]) + + var goodConfig = []byte(` +parameters: + - name: transform1 + transform: + type: generic + generic: + policy: preserve_original_keys + rules: + - input: bytes + output: bytes + multiplier: 3 +`) + v, cfg := test.InitConfig(t, string(goodConfig)) + require.NotNil(t, v) + + configParams := cfg.Parameters[0] + _, err := NewTransformGeneric(configParams) + require.NoError(t, err) + + var badConfig = []byte(` +parameters: + - name: transform1 + transform: + type: generic + generic: + policy: preserve_original_keys + rules: + - input: bytes + output: bytes + multiplier: "not_a_number" +`) + v, cfg = test.InitConfig(t, string(badConfig)) + require.Nil(t, v) + require.Nil(t, cfg) +}