diff --git a/docs/api.md b/docs/api.md index 16cb53745..b383a28e5 100644 --- a/docs/api.md +++ b/docs/api.md @@ -10,7 +10,8 @@ Following is the supported API format for prometheus encode: gauge: single numerical value that can arbitrarily go up and down counter: monotonically increasing counter whose value can only increase histogram: counts samples in configurable buckets - filter: the criterion to filter entries by + agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage + filter: an optional criterion to filter entries by key: the key to match and filter by value: the value to match and filter by valueKey: entry key from which to resolve metric value @@ -172,7 +173,6 @@ Following is the supported API format for specifying metrics aggregations: by: list of fields on which to aggregate operation: sum, min, max, avg or raw_values recordKey: internal field on which to perform the operation - topK: number of highest incidence to report (default - report all) ## Connection tracking API Following is the supported API format for specifying connection tracking: @@ -203,4 +203,24 @@ Following is the supported API format for specifying connection tracking: input: The input field to base the operation on. When omitted, 'name' is used endConnectionTimeout: duration of time to wait from the last flow log to end a connection updateConnectionInterval: duration of time to wait between update reports of a connection + +## Time-based Filters API +Following is the supported API format for specifying metrics time-based filters: + +
+ timebased:
+         rules: list of filter rules, each includes:
+                 name: description of filter result
+                 recordKey: internal field to index TopK
+                 operation: (enum) sum, min, max, avg, last or diff
+                     sum: set output field to sum of parameters fields in the time window
+                     avg: set output field to average of parameters fields in the time window
+                     min: set output field to minimum of parameters fields in the time window
+                     max: set output field to maximum of parameters fields in the time window
+                     last: set output field to last of parameters fields in the time window
+                     diff: set output field to the difference of the first and last parameters fields in the time window
+                 operationKey: internal field on which to perform the operation
+                 topK: number of highest incidence to report (default - report all)
+                 reversed: report lowest incidence instead of highest (default - false)
+                 timeInterval: time duration of data to use to compute the metric
 
\ No newline at end of file diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 53d67bfc3..679d64893 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -16,6 +16,13 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Type** | counter | +### encode_prom_errors +| **Name** | encode_prom_errors | +|:---|:---| +| **Description** | Total errors during metrics generation | +| **Type** | counter | + + ### conntrack_memory_connections | **Name** | conntrack_memory_connections | |:---|:---| diff --git a/pkg/api/api.go b/pkg/api/api.go index 4d83991a4..cd37f489d 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -28,6 +28,7 @@ const ( StdoutType = "stdout" LokiType = "loki" AggregateType = "aggregates" + TimebasedType = "timebased" PromType = "prom" GenericType = "generic" NetworkType = "network" @@ -62,4 +63,5 @@ type API struct { WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"` + ExtractTimebased ExtractTimebased `yaml:"timebased" doc:"## Time-based Filters API\nFollowing is the supported API format for specifying metrics time-based filters:\n"` } diff --git a/pkg/api/enum.go b/pkg/api/enum.go index 3ed78ea90..b5c588c95 100644 --- a/pkg/api/enum.go +++ b/pkg/api/enum.go @@ -31,6 +31,7 @@ type enums struct { ConnTrackOperationEnum ConnTrackOperationEnum ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum DecoderEnum DecoderEnum + FilterOperationEnum FilterOperationEnum } type enumNameCacheKey struct { diff --git a/pkg/api/extract_aggregate.go b/pkg/api/extract_aggregate.go index d67219c83..df708c2f3 100644 --- a/pkg/api/extract_aggregate.go +++ b/pkg/api/extract_aggregate.go @@ -25,5 +25,4 @@ type AggregateDefinition struct { By AggregateBy `yaml:"by,omitempty" json:"by,omitempty" doc:"list of fields on which to aggregate"` Operation AggregateOperation `yaml:"operation,omitempty" json:"operation,omitempty" doc:"sum, min, max, avg or raw_values"` RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field on which to perform the operation"` - TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"` } diff --git a/pkg/api/extract_timebased.go b/pkg/api/extract_timebased.go new file mode 100644 index 000000000..456673964 --- /dev/null +++ b/pkg/api/extract_timebased.go @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type FilterOperationEnum struct { + FilterOperationSum string `yaml:"sum" json:"sum" doc:"set output field to sum of parameters fields in the time window"` + FilterOperationAvg string `yaml:"avg" json:"avg" doc:"set output field to average of parameters fields in the time window"` + FilterOperationMin string `yaml:"min" json:"min" doc:"set output field to minimum of parameters fields in the time window"` + FilterOperationMax string `yaml:"max" json:"max" doc:"set output field to maximum of parameters fields in the time window"` + FilterOperationLast string `yaml:"last" json:"last" doc:"set output field to last of parameters fields in the time window"` + FilterOperationDiff string `yaml:"diff" json:"diff" doc:"set output field to the difference of the first and last parameters fields in the time window"` +} + +func FilterOperationName(operation string) string { + return GetEnumName(FilterOperationEnum{}, operation) +} + +type ExtractTimebased struct { + Rules []TimebasedFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"` +} + +type TimebasedFilterRule struct { + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of filter result"` + RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field to index TopK"` + Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"FilterOperationEnum" doc:"sum, min, max, avg, last or diff"` + OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"` + TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"` + Reversed bool `yaml:"reversed,omitempty" json:"reversed,omitempty" doc:"report lowest incidence instead of highest (default - false)"` + TimeInterval Duration `yaml:"timeInterval,omitempty" json:"timeInterval,omitempty" doc:"time duration of data to use to compute the metric"` +} diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 8201be6e1..22a464ede 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -43,6 +43,7 @@ type Definition struct { Tags []string TransformNetwork *api.TransformNetwork AggregateDefinitions *aggregate.Definitions + ExtractTimebased *api.ExtractTimebased PromEncode *api.PromEncode Visualization *Visualization } @@ -54,6 +55,7 @@ type ConfGen struct { config *Config transformRules api.NetworkTransformRules aggregateDefinitions aggregate.Definitions + timebasedTopKs api.ExtractTimebased promMetrics api.PromMetricsItems visualizations Visualizations definitions Definitions @@ -178,7 +180,7 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { } // parse extract - definition.AggregateDefinitions, err = cg.parseExtract(&defFile.Extract) + definition.AggregateDefinitions, definition.ExtractTimebased, err = cg.parseExtract(&defFile.Extract) if err != nil { log.Debugf("parseExtract err: %v ", err) return err diff --git a/pkg/confgen/extract.go b/pkg/confgen/extract.go index 6604904d3..f21162027 100644 --- a/pkg/confgen/extract.go +++ b/pkg/confgen/extract.go @@ -19,27 +19,48 @@ package confgen import ( jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Definitions, error) { +func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Definitions, *api.ExtractTimebased, error) { var jsoniterJson = jsoniter.ConfigCompatibleWithStandardLibrary aggregateExtract := (*extract)["aggregates"] b, err := jsoniterJson.Marshal(&aggregateExtract) if err != nil { log.Debugf("jsoniterJson.Marshal err: %v ", err) - return nil, err + return nil, nil, err } var jsonNetworkAggregate aggregate.Definitions err = config.JsonUnmarshalStrict(b, &jsonNetworkAggregate) if err != nil { log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) - return nil, err + return nil, nil, err } cg.aggregateDefinitions = append(cg.aggregateDefinitions, jsonNetworkAggregate...) - return &jsonNetworkAggregate, nil + + timebasedExtract, ok := (*extract)["timebased"] + if !ok { + return &jsonNetworkAggregate, nil, nil + } + b, err = jsoniterJson.Marshal(&timebasedExtract) + if err != nil { + log.Debugf("jsoniterJson.Marshal err: %v ", err) + return nil, nil, err + } + + var jsonTimebasedTopKs api.ExtractTimebased + err = config.JsonUnmarshalStrict(b, &jsonTimebasedTopKs) + if err != nil { + log.Debugf("Unmarshal api.ExtractTimebased err: %v ", err) + return nil, nil, err + } + + cg.timebasedTopKs.Rules = append(cg.timebasedTopKs.Rules, jsonTimebasedTopKs.Rules...) + + return &jsonNetworkAggregate, &jsonTimebasedTopKs, nil } diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 5a5ccb37c..14f2af377 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -48,6 +48,11 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { metricsNode := forkedNode if len(cg.aggregateDefinitions) > 0 { metricsNode = metricsNode.Aggregate("extract_aggregate", cg.aggregateDefinitions) + if len(cg.timebasedTopKs.Rules) > 0 { + metricsNode = metricsNode.ExtractTimebased("extract_timebased", api.ExtractTimebased{ + Rules: cg.timebasedTopKs.Rules, + }) + } } if len(cg.promMetrics) > 0 { metricsNode.EncodePrometheus("encode_prom", api.PromEncode{ @@ -78,6 +83,8 @@ func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network) case "extract_aggregate": parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions) + case "extract_timebased": + parameters[i] = config.NewTimbasedParams("extract_timebased", cg.timebasedTopKs) case "encode_prom": parameters[i] = config.NewEncodePrometheusParams("encode_prom", api.PromEncode{ Metrics: cg.promMetrics, diff --git a/pkg/config/config.go b/pkg/config/config.go index b4cfd4d73..c25ac8440 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -81,6 +81,7 @@ type Extract struct { Type string `yaml:"type" json:"type"` Aggregates []api.AggregateDefinition `yaml:"aggregates,omitempty" json:"aggregates,omitempty"` ConnTrack *api.ConnTrack `yaml:"conntrack,omitempty" json:"conntrack,omitempty"` + Timebased *api.ExtractTimebased `yaml:"timebased,omitempty" json:"timebased,omitempty"` } type Encode struct { diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index c39707136..d10e696ff 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -99,6 +99,11 @@ func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefini return b.next(name, NewAggregateParams(name, aggs)) } +// ExtractTimebased chains the current stage with a ExtractTimebased stage and returns that new stage +func (b *PipelineBuilderStage) ExtractTimebased(name string, tb api.ExtractTimebased) PipelineBuilderStage { + return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.TimebasedType, Timebased: &tb}}) +} + // TransformGeneric chains the current stage with a TransformGeneric stage and returns that new stage func (b *PipelineBuilderStage) TransformGeneric(name string, gen api.TransformGeneric) PipelineBuilderStage { return b.next(name, NewTransformGenericParams(name, gen)) diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go index 99e0ea85b..799ca4f8b 100644 --- a/pkg/config/stage_params.go +++ b/pkg/config/stage_params.go @@ -53,6 +53,10 @@ func NewConnTrackParams(name string, ct api.ConnTrack) StageParam { return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}} } +func NewTimbasedParams(name string, ct api.ExtractTimebased) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.TimebasedType, Timebased: &ct}} +} + func NewEncodePrometheusParams(name string, prom api.PromEncode) StageParam { return StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}} } diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index b69fccb4f..bce037c03 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -18,7 +18,6 @@ package aggregate import ( - "container/heap" "fmt" "math" "sort" @@ -235,83 +234,9 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { group.recentOpValue = getInitValue(string(aggregate.Definition.Operation)) }) - if aggregate.Definition.TopK > 0 { - metrics = aggregate.computeTopK(metrics) - } - return metrics } func (aggregate Aggregate) Cleanup(entry interface{}) { // nothing special to do in this callback function } - -// functions to manipulate a heap to generate TopK entries -// We need to implement the heap interface: Len(), Less(), Swap(), Push(), Pop() - -type heapItem struct { - value float64 - metrics *config.GenericMap -} - -type topkHeap []heapItem - -func (h topkHeap) Len() int { - return len(h) -} - -func (h topkHeap) Less(i, j int) bool { - return h[i].value < h[j].value -} - -func (h topkHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *topkHeap) Push(x interface{}) { - *h = append(*h, x.(heapItem)) -} - -func (h *topkHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -func (aggregate Aggregate) computeTopK(inputMetrics []config.GenericMap) []config.GenericMap { - // maintain a heap with k items, always dropping the lowest - // we will be left with the TopK items - var prevMin float64 - prevMin = -math.MaxFloat64 - topk := aggregate.Definition.TopK - h := &topkHeap{} - for index, metricMap := range inputMetrics { - val := metricMap["total_value"].(float64) - if val < prevMin { - continue - } - item := heapItem{ - metrics: &inputMetrics[index], - value: val, - } - heap.Push(h, item) - if h.Len() > topk { - x := heap.Pop(h) - prevMin = x.(heapItem).value - } - } - log.Debugf("heap: %v", h) - - // convert the remaining heap to a sorted array - result := make([]config.GenericMap, h.Len()) - heapLen := h.Len() - for i := heapLen; i > 0; i-- { - poppedItem := heap.Pop(h).(heapItem) - log.Debugf("poppedItem: %v", poppedItem) - result[i-1] = *poppedItem.metrics - } - log.Debugf("topk items: %v", result) - return result -} diff --git a/pkg/pipeline/extract/extract_timebased.go b/pkg/pipeline/extract/extract_timebased.go new file mode 100644 index 000000000..b1477963c --- /dev/null +++ b/pkg/pipeline/extract/extract_timebased.go @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package extract + +import ( + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/timebased" + log "github.com/sirupsen/logrus" +) + +type ExtractTimebased struct { + Filters []timebased.FilterStruct + RecordKeyStructs map[string]*timebased.RecordKeyTable +} + +// Extract extracts a flow before being stored +func (et *ExtractTimebased) Extract(entries []config.GenericMap) []config.GenericMap { + log.Debugf("entering ExtractTimebased Extract") + nowInSecs := time.Now() + // Populate the Table with the current entries + for _, entry := range entries { + log.Debugf("ExtractTimebased Extract, entry = %v", entry) + timebased.AddEntryToTables(et.RecordKeyStructs, entry, nowInSecs) + } + + output := make([]config.GenericMap, 0) + // Calculate Filters based on time windows + for _, filter := range et.Filters { + filter.CalculateResults(nowInSecs) + filter.ComputeTopkBotk() + genMap := filter.CreateGenericMap() + output = append(output, genMap...) + } + log.Debugf("output of extract timebased: %v", output) + + // delete entries from tables that are outside time windows + timebased.DeleteOldEntriesFromTables(et.RecordKeyStructs, nowInSecs) + + return output +} + +// NewExtractTimebased creates a new extractor +func NewExtractTimebased(params config.StageParam) (Extractor, error) { + var rules []api.TimebasedFilterRule + if params.Extract != nil && params.Extract.Timebased.Rules != nil { + rules = params.Extract.Timebased.Rules + } + log.Debugf("NewExtractTimebased; rules = %v", rules) + + tmpRecordKeyStructs, tmpFilters := timebased.CreateRecordKeysAndFilters(rules) + + return &ExtractTimebased{ + Filters: tmpFilters, + RecordKeyStructs: tmpRecordKeyStructs, + }, nil +} diff --git a/pkg/pipeline/extract/extract_timebased_test.go b/pkg/pipeline/extract/extract_timebased_test.go new file mode 100644 index 000000000..496faff52 --- /dev/null +++ b/pkg/pipeline/extract/extract_timebased_test.go @@ -0,0 +1,368 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package extract + +import ( + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/timebased" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +func GetMockTimebased1() ExtractTimebased { + tb := ExtractTimebased{ + Filters: []timebased.FilterStruct{ + {Rule: api.TimebasedFilterRule{ + Name: "TopK_Bytes1", + RecordKey: "SrcAddr", + Operation: "last", + OperationKey: "Bytes", + TopK: 3, + TimeInterval: api.Duration{Duration: 10 * time.Second}, + }}, + {Rule: api.TimebasedFilterRule{ + Name: "BotK_Bytes1", + RecordKey: "SrcAddr", + Operation: "avg", + OperationKey: "Bytes", + TopK: 2, + Reversed: true, + TimeInterval: api.Duration{Duration: 15 * time.Second}, + }}, + }, + RecordKeyStructs: map[string]*timebased.RecordKeyTable{}, + } + return tb +} + +var yamlConfig1 = ` +pipeline: + - name: extract1 +parameters: + - name: extract1 + extract: + type: timebased + timebased: + rules: + - name: TopK_Bytes1 + operation: last + operationKey: Bytes + recordKey: SrcAddr + topK: 3 + timeInterval: 10s + - name: BotK_Bytes1 + operation: avg + operationKey: Bytes + recordKey: SrcAddr + topK: 2 + reversed: true + timeInterval: 15s +` + +var yamlConfig2 = ` +pipeline: + - name: extract2 +parameters: + - name: extract2 + extract: + type: timebased + timebased: + rules: + - name: TopK_Bytes2 + operation: sum + operationKey: Bytes + recordKey: SrcAddr + topK: 1 + timeInterval: 10s +` + +var yamlConfig3 = ` +pipeline: + - name: extract3 +parameters: + - name: extract3 + extract: + type: timebased + timebased: + rules: + - name: BotK_Bytes3 + operation: diff + operationKey: Bytes + recordKey: SrcAddr + topK: 1 + reversed: true + timeInterval: 10s +` + +var yamlConfig4 = ` +pipeline: + - name: extract4 +parameters: + - name: extract4 + extract: + type: timebased + timebased: + rules: + - name: TopK_Bytes4 + operation: max + operationKey: Bytes + recordKey: SrcAddr + topK: 1 + timeInterval: 10s +` + +var yamlConfig5 = ` +pipeline: + - name: extract5 +parameters: + - name: extract5 + extract: + type: timebased + timebased: + rules: + - name: BotK_Bytes5 + operation: min + operationKey: Bytes + recordKey: SrcAddr + topK: 1 + reversed: true + timeInterval: 10s +` + +var yamlConfig6 = ` +pipeline: + - name: extract6 +parameters: + - name: extract6 + extract: + type: timebased + timebased: + rules: + - name: All_Bytes6 + operation: sum + operationKey: Bytes + recordKey: SrcAddr + timeInterval: 10s +` + +func initTimebased(t *testing.T, yamlConfig string) *ExtractTimebased { + v, cfg := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + extractor, err := NewExtractTimebased(cfg.Parameters[0]) + require.NoError(t, err) + + return extractor.(*ExtractTimebased) +} + +func Test_NewExtractTimebased(t *testing.T) { + + tb := initTimebased(t, yamlConfig1) + require.NotNil(t, tb) + expectedTimebased := GetMockTimebased1() + require.Equal(t, expectedTimebased.Filters[0].Rule, tb.Filters[0].Rule) + require.Equal(t, expectedTimebased.Filters[1].Rule, tb.Filters[1].Rule) +} + +func Test_ExtractTimebasedExtract1(t *testing.T) { + tb := initTimebased(t, yamlConfig1) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 5, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.4", + "name": "TopK_Bytes1", + "operation": "last", + "operation_key": "Bytes", + "operation_result": float64(1000), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.4", + }, + { + "key": "10.0.0.3", + "name": "TopK_Bytes1", + "operation": "last", + "operation_key": "Bytes", + "operation_result": float64(900), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.3", + }, + { + "key": "10.0.0.2", + "name": "TopK_Bytes1", + "operation": "last", + "operation_key": "Bytes", + "operation_result": float64(800), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.2", + }, + { + "key": "10.0.0.1", + "name": "BotK_Bytes1", + "operation": "avg", + "operation_key": "Bytes", + "operation_result": float64(400), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.1", + }, + { + "key": "10.0.0.2", + "name": "BotK_Bytes1", + "operation": "avg", + "operation_key": "Bytes", + "operation_result": float64(500), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.2", + }, + } + require.Equal(t, expectedOutput, output) +} + +func Test_ExtractTimebasedExtract2(t *testing.T) { + tb := initTimebased(t, yamlConfig2) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 1, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.3", + "name": "TopK_Bytes2", + "operation": "sum", + "operation_key": "Bytes", + "operation_result": float64(1800), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.3", + }, + } + require.Equal(t, expectedOutput, output) +} + +func Test_ExtractTimebasedExtract3(t *testing.T) { + tb := initTimebased(t, yamlConfig3) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 1, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.4", + "name": "BotK_Bytes3", + "operation": "diff", + "operation_key": "Bytes", + "operation_result": float64(0), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.4", + }, + } + require.Equal(t, expectedOutput, output) +} + +func Test_ExtractTimebasedExtract4(t *testing.T) { + tb := initTimebased(t, yamlConfig4) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 1, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.4", + "name": "TopK_Bytes4", + "operation": "max", + "operation_key": "Bytes", + "operation_result": float64(1000), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.4", + }, + } + require.Equal(t, expectedOutput, output) +} + +func Test_ExtractTimebasedExtract5(t *testing.T) { + tb := initTimebased(t, yamlConfig5) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 1, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.1", + "name": "BotK_Bytes5", + "operation": "min", + "operation_key": "Bytes", + "operation_result": float64(100), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.1", + }, + } + require.Equal(t, expectedOutput, output) +} + +func Test_ExtractTimebasedExtract6(t *testing.T) { + tb := initTimebased(t, yamlConfig6) + require.NotNil(t, tb) + entries := test.GetExtractMockEntries2() + output := tb.Extract(entries) + require.Equal(t, 4, len(output)) + expectedOutput := []config.GenericMap{ + { + "key": "10.0.0.1", + "name": "All_Bytes6", + "operation": "sum", + "operation_key": "Bytes", + "operation_result": float64(1200), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.1", + }, + { + "key": "10.0.0.2", + "name": "All_Bytes6", + "operation": "sum", + "operation_key": "Bytes", + "operation_result": float64(1500), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.2", + }, + { + "key": "10.0.0.3", + "name": "All_Bytes6", + "operation": "sum", + "operation_key": "Bytes", + "operation_result": float64(1800), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.3", + }, + { + "key": "10.0.0.4", + "name": "All_Bytes6", + "operation": "sum", + "operation_key": "Bytes", + "operation_result": float64(1000), + "record_key": "SrcAddr", + "SrcAddr": "10.0.0.4", + }, + } + for _, configMap := range expectedOutput { + require.Contains(t, output, configMap) + } +} diff --git a/pkg/pipeline/extract/timebased/filters.go b/pkg/pipeline/extract/timebased/filters.go new file mode 100644 index 000000000..8728fed3b --- /dev/null +++ b/pkg/pipeline/extract/timebased/filters.go @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package timebased + +import ( + "container/list" + "math" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + log "github.com/sirupsen/logrus" +) + +func (fs *FilterStruct) CalculateResults(nowInSecs time.Time) { + log.Debugf("CalculateResults nowInSecs = %v", nowInSecs) + oldestValidTime := nowInSecs.Add(-fs.Rule.TimeInterval.Duration) + for key, l := range fs.RecordKeyDataTable.dataTableMap { + var valueFloat64 = float64(0) + var err error + switch fs.Rule.Operation { + case api.FilterOperationName("FilterOperationLast"): + // handle empty list + if l.Len() == 0 { + continue + } + valueFloat64, err = utils.ConvertToFloat64(l.Back().Value.(*TableEntry).entry[fs.Rule.OperationKey]) + if err != nil { + continue + } + case api.FilterOperationName("FilterOperationDiff"): + for e := l.Front(); e != nil; e = e.Next() { + cEntry := e.Value.(*TableEntry) + if cEntry.timeStamp.Before(oldestValidTime) { + // entry is out of time range; ignore it + continue + } + first, err := utils.ConvertToFloat64(e.Value.(*TableEntry).entry[fs.Rule.OperationKey]) + if err != nil { + continue + } + last, err := utils.ConvertToFloat64(l.Back().Value.(*TableEntry).entry[fs.Rule.OperationKey]) + if err != nil { + continue + } + valueFloat64 = last - first + break + } + default: + valueFloat64 = fs.CalculateValue(l, oldestValidTime) + } + fs.Results[key] = &filterOperationResult{ + key: key, + operationResult: valueFloat64, + } + } + log.Debugf("CalculateResults Results = %v", fs.Results) +} + +func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time) float64 { + log.Debugf("CalculateValue nowInSecs = %v", oldestValidTime) + currentValue := getInitValue(fs.Rule.Operation) + nItems := 0 + for e := l.Front(); e != nil; e = e.Next() { + cEntry := e.Value.(*TableEntry) + if cEntry.timeStamp.Before(oldestValidTime) { + // entry is out of time range; ignore it + continue + } + valueFloat64, _ := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey]) + nItems++ + switch fs.Rule.Operation { + case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"): + currentValue += valueFloat64 + case api.FilterOperationName("FilterOperationMax"): + currentValue = math.Max(currentValue, valueFloat64) + case api.FilterOperationName("FilterOperationMin"): + currentValue = math.Min(currentValue, valueFloat64) + } + } + if fs.Rule.Operation == api.FilterOperationName("FilterOperationAvg") && nItems > 0 { + currentValue = currentValue / float64(nItems) + } + return currentValue +} + +func getInitValue(operation string) float64 { + switch operation { + case api.FilterOperationName("FilterOperationSum"), + api.FilterOperationName("FilterOperationAvg"), + api.FilterOperationName("FilterOperationLast"), + api.FilterOperationName("FilterOperationDiff"): + return 0 + case api.FilterOperationName("FilterOperationMax"): + return (-math.MaxFloat64) + case api.FilterOperationName("FilterOperationMin"): + return math.MaxFloat64 + default: + log.Panicf("unkown operation %v", operation) + return 0 + } +} + +func (fs *FilterStruct) ComputeTopkBotk() { + var output []filterOperationResult + if fs.Rule.TopK > 0 { + if fs.Rule.Reversed { + output = fs.computeBotK(fs.Results) + } else { + output = fs.computeTopK(fs.Results) + } + } else { + // return all Results; convert map to array + output = make([]filterOperationResult, len(fs.Results)) + i := 0 + for _, item := range fs.Results { + output[i] = *item + i++ + } + } + fs.Output = output +} + +func (fs *FilterStruct) CreateGenericMap() []config.GenericMap { + output := make([]config.GenericMap, 0) + for _, result := range fs.Output { + t := config.GenericMap{ + "name": fs.Rule.Name, + "record_key": fs.Rule.RecordKey, + "operation": fs.Rule.Operation, + "operation_key": fs.Rule.OperationKey, + "key": result.key, + "operation_result": result.operationResult, + } + t[fs.Rule.RecordKey] = result.key + log.Debugf("FilterStruct CreateGenericMap: %v", t) + output = append(output, t) + } + log.Debugf("FilterStruct CreateGenericMap: output = %v \n", output) + return output +} diff --git a/pkg/pipeline/extract/timebased/heap.go b/pkg/pipeline/extract/timebased/heap.go new file mode 100644 index 000000000..3167b636d --- /dev/null +++ b/pkg/pipeline/extract/timebased/heap.go @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package timebased + +import ( + "container/heap" + "math" + + log "github.com/sirupsen/logrus" +) + +// functions to manipulate a heap to generate TopK/BotK entries +// We need to implement the heap interface: Len(), Less(), Swap(), Push(), Pop() + +type heapItem struct { + value float64 + result *filterOperationResult +} + +type topkHeap []heapItem +type botkHeap []heapItem + +func (h topkHeap) Len() int { + return len(h) +} + +func (h topkHeap) Less(i, j int) bool { + return h[i].value < h[j].value +} + +func (h topkHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *topkHeap) Push(x interface{}) { + *h = append(*h, x.(heapItem)) +} + +func (h *topkHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (filter *FilterStruct) computeTopK(inputs filterOperationResults) []filterOperationResult { + // maintain a heap with k items, always dropping the lowest + // we will be left with the TopK items + var prevMin float64 + prevMin = -math.MaxFloat64 + topk := filter.Rule.TopK + h := &topkHeap{} + for key, metricMap := range inputs { + val := metricMap.operationResult + if val < prevMin { + continue + } + item := heapItem{ + result: inputs[key], + value: val, + } + heap.Push(h, item) + if h.Len() > topk { + x := heap.Pop(h) + prevMin = x.(heapItem).value + } + } + log.Debugf("heap: %v", h) + + // convert the remaining heap to a sorted array + result := make([]filterOperationResult, h.Len()) + heapLen := h.Len() + for i := heapLen; i > 0; i-- { + poppedItem := heap.Pop(h).(heapItem) + log.Debugf("poppedItem: %v", poppedItem) + result[i-1] = *poppedItem.result + } + log.Debugf("topk items: %v", result) + return result +} + +func (h botkHeap) Len() int { + return len(h) +} + +// For a botk heap, we reverse the order of the Less() operation +func (h botkHeap) Less(i, j int) bool { + return h[i].value > h[j].value +} + +func (h botkHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *botkHeap) Push(x interface{}) { + *h = append(*h, x.(heapItem)) +} + +func (h *botkHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (filter *FilterStruct) computeBotK(inputs filterOperationResults) []filterOperationResult { + // maintain a heap with k items, always dropping the highest + // we will be left with the BotK items + var prevMax float64 + prevMax = math.MaxFloat64 + botk := filter.Rule.TopK + h := &botkHeap{} + for key, metricMap := range inputs { + val := metricMap.operationResult + if val > prevMax { + continue + } + item := heapItem{ + result: inputs[key], + value: val, + } + heap.Push(h, item) + if h.Len() > botk { + x := heap.Pop(h) + prevMax = x.(heapItem).value + } + } + log.Debugf("heap: %v", h) + + // convert the remaining heap to a sorted array + result := make([]filterOperationResult, h.Len()) + heapLen := h.Len() + for i := heapLen; i > 0; i-- { + poppedItem := heap.Pop(h).(heapItem) + log.Debugf("poppedItem: %v", poppedItem) + result[i-1] = *poppedItem.result + } + log.Debugf("botk items: %v", result) + return result +} diff --git a/pkg/pipeline/extract/timebased/tables.go b/pkg/pipeline/extract/timebased/tables.go new file mode 100644 index 000000000..25854654f --- /dev/null +++ b/pkg/pipeline/extract/timebased/tables.go @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package timebased + +import ( + "container/list" + "fmt" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +func AddEntryToTables(recordKeyStructs map[string]*RecordKeyTable, entry config.GenericMap, nowInSecs time.Time) { + for key, recordTable := range recordKeyStructs { + log.Debugf("ExtractTimebased addEntryToTables: key = %s, recordTable = %v", key, recordTable) + if val, ok := entry[key]; ok { + cEntry := &TableEntry{ + timeStamp: nowInSecs, + entry: entry, + } + // allocate list if it does not yet exist + if recordTable.dataTableMap[val.(string)] == nil { + recordTable.dataTableMap[val.(string)] = list.New() + } + log.Debugf("ExtractTimebased addEntryToTables: adding to table %s", val) + AddEntryToTable(cEntry, recordTable.dataTableMap[val.(string)]) + } + } +} + +func AddEntryToTable(cEntry *TableEntry, tableList *list.List) { + log.Debugf("AddEntryToTable: adding table entry %v", cEntry) + tableList.PushBack(cEntry) +} + +func DeleteOldEntriesFromTables(recordKeyStructs map[string]*RecordKeyTable, nowInSecs time.Time) { + for _, recordTable := range recordKeyStructs { + oldestTime := nowInSecs.Add(-recordTable.maxTimeInterval) + for _, tableMap := range recordTable.dataTableMap { + for { + head := tableMap.Front() + if head == nil { + break + } + tableEntry := head.Value.(*TableEntry) + if tableEntry.timeStamp.Before(oldestTime) { + tableMap.Remove(head) + continue + } + break + } + // TODO: if tableMap is empty, we should clean it up and remove it from recordTable.dataTableMap + } + } +} + +func PrintTable(l *list.List) { + fmt.Printf("start PrintTable: \n") + for e := l.Front(); e != nil; e = e.Next() { + fmt.Printf("PrintTable: e = %v, Value = %v \n", e, e.Value) + } + fmt.Printf("end PrintTable: \n") +} diff --git a/pkg/pipeline/extract/timebased/timebased.go b/pkg/pipeline/extract/timebased/timebased.go new file mode 100644 index 000000000..cb801b41b --- /dev/null +++ b/pkg/pipeline/extract/timebased/timebased.go @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package timebased + +import ( + "container/list" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +type FilterStruct struct { + Rule api.TimebasedFilterRule + RecordKeyDataTable *RecordKeyTable + Results filterOperationResults + Output []filterOperationResult +} + +type filterOperationResults map[string]*filterOperationResult + +type filterOperationResult struct { + key string + operationResult float64 +} + +type DataTableMap map[string]*list.List + +type RecordKeyTable struct { + maxTimeInterval time.Duration + dataTableMap DataTableMap +} + +type TableEntry struct { + timeStamp time.Time + entry config.GenericMap +} + +// CreateRecordKeysAndFilters creates structures for each RecordKey that appears in the rules. +// Note that the same RecordKey might appear in more than one Rule. +// Connect RecordKey structure to its filters. +// For each RecordKey, we need a table of history to handle the largest TimeInterval. +func CreateRecordKeysAndFilters(rules []api.TimebasedFilterRule) (map[string]*RecordKeyTable, []FilterStruct) { + tmpRecordKeyStructs := make(map[string]*RecordKeyTable) + tmpFilters := make([]FilterStruct, 0) + for _, filterRule := range rules { + log.Debugf("CreateRecordKeysAndFilters: filterRule = %v", filterRule) + // verify there is a valid RecordKey + if filterRule.RecordKey == "" { + log.Errorf("missing RecordKey for filter %s", filterRule.Name) + continue + } + rStruct, ok := tmpRecordKeyStructs[filterRule.RecordKey] + if !ok { + rStruct = &RecordKeyTable{ + maxTimeInterval: filterRule.TimeInterval.Duration, + dataTableMap: make(DataTableMap), + } + tmpRecordKeyStructs[filterRule.RecordKey] = rStruct + log.Debugf("new RecordKeyTable: name = %s = %v", filterRule.RecordKey, *rStruct) + } else { + if filterRule.TimeInterval.Duration > rStruct.maxTimeInterval { + rStruct.maxTimeInterval = filterRule.TimeInterval.Duration + } + } + // verify the validity of the Operation field in the filterRule + switch filterRule.Operation { + case api.FilterOperationName("FilterOperationLast"), + api.FilterOperationName("FilterOperationDiff"), + api.FilterOperationName("FilterOperationAvg"), + api.FilterOperationName("FilterOperationMax"), + api.FilterOperationName("FilterOperationMin"), + api.FilterOperationName("FilterOperationSum"): + // OK; nothing to do + default: + log.Errorf("illegal operation type %s", filterRule.Operation) + continue + } + tmpFilter := FilterStruct{ + Rule: filterRule, + RecordKeyDataTable: rStruct, + Results: make(filterOperationResults), + } + log.Debugf("new Rule = %v", tmpFilter) + tmpFilters = append(tmpFilters, tmpFilter) + } + return tmpRecordKeyStructs, tmpFilters +} diff --git a/pkg/pipeline/extract/timebased/timebased_test.go b/pkg/pipeline/extract/timebased/timebased_test.go new file mode 100644 index 000000000..3c0817397 --- /dev/null +++ b/pkg/pipeline/extract/timebased/timebased_test.go @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package timebased + +import ( + "fmt" + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +func getTimebasedRules() []api.TimebasedFilterRule { + rules := []api.TimebasedFilterRule{ + { + Name: "Top2_sum", + RecordKey: "SrcAddr", + Operation: "sum", + OperationKey: "Bytes", + TopK: 2, + TimeInterval: api.Duration{Duration: 1 * time.Second}, + }, + { + Name: "Bot2_last", + RecordKey: "SrcAddr", + Operation: "last", + OperationKey: "Bytes", + Reversed: true, + TimeInterval: api.Duration{Duration: 3 * time.Second}, + }, + { + Name: "Top4_max", + RecordKey: "DstAddr", + Operation: "max", + OperationKey: "Bytes", + TopK: 4, + Reversed: false, + TimeInterval: api.Duration{Duration: 1 * time.Second}, + }, + } + return rules +} + +func Test_CreateRecordKeysAndFilters(t *testing.T) { + rules := getTimebasedRules() + recordKeyStructs, filters := CreateRecordKeysAndFilters(rules) + require.Equal(t, 2, len(recordKeyStructs)) + require.Equal(t, 3, len(filters)) + require.Contains(t, recordKeyStructs, "SrcAddr") + require.Contains(t, recordKeyStructs, "DstAddr") + require.Equal(t, time.Duration(3*time.Second), recordKeyStructs["SrcAddr"].maxTimeInterval) + require.Equal(t, time.Duration(1*time.Second), recordKeyStructs["DstAddr"].maxTimeInterval) + require.Equal(t, filters[0].RecordKeyDataTable, recordKeyStructs["SrcAddr"]) + require.Equal(t, filters[1].RecordKeyDataTable, recordKeyStructs["SrcAddr"]) + require.Equal(t, filters[2].RecordKeyDataTable, recordKeyStructs["DstAddr"]) +} + +func Test_CreateRecordKeysAndFiltersError(t *testing.T) { + rules := []api.TimebasedFilterRule{ + { + Name: "filter1", + RecordKey: "", + Operation: "sum", + OperationKey: "operationKey1", + TopK: 2, + TimeInterval: api.Duration{Duration: 10 * time.Second}, + }, + } + recordKeyStructs, filters := CreateRecordKeysAndFilters(rules) + require.Equal(t, 0, len(recordKeyStructs)) + require.Equal(t, 0, len(filters)) +} + +func Test_AddAndDeleteEntryToTables(t *testing.T) { + rules := getTimebasedRules() + recordKeyStructs, _ := CreateRecordKeysAndFilters(rules) + entries := test.GetExtractMockEntries2() + nowInSecs := time.Now() + for _, entry := range entries { + AddEntryToTables(recordKeyStructs, entry, nowInSecs) + } + require.Equal(t, 4, len(recordKeyStructs["SrcAddr"].dataTableMap)) + require.Equal(t, 1, len(recordKeyStructs["DstAddr"].dataTableMap)) + require.Contains(t, recordKeyStructs["SrcAddr"].dataTableMap, "10.0.0.1") + require.Contains(t, recordKeyStructs["SrcAddr"].dataTableMap, "10.0.0.2") + require.Contains(t, recordKeyStructs["SrcAddr"].dataTableMap, "10.0.0.3") + require.Contains(t, recordKeyStructs["DstAddr"].dataTableMap, "11.0.0.1") + + // wait for timeout and test that items were deleted from table + fmt.Printf("going to sleep for timeout value \n") + time.Sleep(2 * time.Second) + fmt.Printf("after sleep for timeout value \n") + nowInSecs = time.Now() + DeleteOldEntriesFromTables(recordKeyStructs, nowInSecs) + require.Equal(t, 0, recordKeyStructs["DstAddr"].dataTableMap["11.0.0.1"].Len()) + require.Equal(t, 3, recordKeyStructs["SrcAddr"].dataTableMap["10.0.0.1"].Len()) +} diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 10ba35942..6e5b4cd98 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -324,6 +324,8 @@ func getExtractor(params config.StageParam) (extract.Extractor, error) { extractor, err = extract.NewExtractAggregate(params) case api.ConnTrackType: extractor, err = conntrack.NewConnectionTrack(params, clock.New()) + case api.TimebasedType: + extractor, err = extract.NewExtractTimebased(params) default: panic(fmt.Sprintf("`extract` type %s not defined; if no extractor needed, specify `none`", params.Extract.Type)) } diff --git a/pkg/pipeline/transform_topk_test.go b/pkg/pipeline/transform_topk_test.go deleted file mode 100644 index 2732c4c06..000000000 --- a/pkg/pipeline/transform_topk_test.go +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (C) 2022 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package pipeline - -import ( - "testing" - - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" - "github.com/netobserv/flowlogs-pipeline/pkg/test" - "github.com/stretchr/testify/require" -) - -const testConfigAggregateTopK = `--- -pipeline: - - name: ingest_file - - follows: ingest_file - name: transform_generic - - follows: transform_generic - name: transform_network - - follows: transform_network - name: extract_aggregate - - follows: extract_aggregate - name: write_none -parameters: - - ingest: - type: file - file: - filename: ../../hack/examples/ocp-ipfix-flowlogs.json - decoder: - type: json - name: ingest_file - - name: transform_generic - transform: - generic: - policy: replace_keys - rules: - - input: SrcAddr - output: srcIP - - input: SrcPort - output: srcPort - - input: DstAddr - output: dstIP - - input: DstPort - output: dstPort - - input: Proto - output: proto - - input: Bytes - output: bytes - - input: TCPFlags - output: TCPFlags - - input: SrcAS - output: srcAS - - input: DstAS - output: dstAS - type: generic - - name: transform_network - transform: - network: - rules: - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 - type: network - - extract: - aggregates: - - name: count_source_destination_subnet - by: - - dstSubnet24 - - srcSubnet24 - operation: count - recordKey: "" - topK: 4 - type: aggregates - name: extract_aggregate - - name: write_none - write: - type: none -` - -func TestAggregateTopk(t *testing.T) { - var mainPipeline *Pipeline - var err error - v, cfg := test.InitConfig(t, testConfigAggregateTopK) - require.NotNil(t, v) - - mainPipeline, err = NewPipeline(cfg) - require.NoError(t, err) - - // The file ingester reads the entire file, pushes it down the pipeline, and then exits - // So we don't need to run it in a separate go-routine - mainPipeline.Run() - // Test the final outcome to see that it is reasonable - extractor := mainPipeline.pipelineStages[3].Extractor.(*extract.ExtractAggregate) - writer := mainPipeline.pipelineStages[4].Writer.(*write.WriteNone) - require.Equal(t, 4, extractor.Aggregates.Aggregates[0].Definition.TopK) - require.Equal(t, 4, len(writer.PrevRecords)) - require.Equal(t, float64(545), writer.PrevRecords[0]["total_value"]) - require.Equal(t, float64(491), writer.PrevRecords[1]["total_value"]) - require.Equal(t, float64(357), writer.PrevRecords[2]["total_value"]) - require.Equal(t, float64(299), writer.PrevRecords[3]["total_value"]) -} diff --git a/pkg/test/utils.go b/pkg/test/utils.go index e57ac6ef7..32b0d7ad2 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -174,3 +174,19 @@ func WaitFromChannel(in chan []config.GenericMap, timeout time.Duration) ([]conf return nil, errors.New("Timeout reached") } } + +func GetExtractMockEntries2() []config.GenericMap { + entries := []config.GenericMap{ + {"SrcAddr": "10.0.0.1", "DstAddr": "11.0.0.1", "Bytes": 100, "Packets": 1}, + {"SrcAddr": "10.0.0.2", "DstAddr": "11.0.0.1", "Bytes": 200, "Packets": 2}, + {"SrcAddr": "10.0.0.3", "DstAddr": "11.0.0.1", "Bytes": 300, "Packets": 3}, + {"SrcAddr": "10.0.0.1", "DstAddr": "11.0.0.1", "Bytes": 400, "Packets": 1}, + {"SrcAddr": "10.0.0.2", "DstAddr": "11.0.0.1", "Bytes": 500, "Packets": 1}, + {"SrcAddr": "10.0.0.3", "DstAddr": "11.0.0.1", "Bytes": 600, "Packets": 1}, + {"SrcAddr": "10.0.0.1", "DstAddr": "11.0.0.1", "Bytes": 700, "Packets": 4}, + {"SrcAddr": "10.0.0.2", "DstAddr": "11.0.0.1", "Bytes": 800, "Packets": 5}, + {"SrcAddr": "10.0.0.3", "DstAddr": "11.0.0.1", "Bytes": 900, "Packets": 1}, + {"SrcAddr": "10.0.0.4", "DstAddr": "11.0.0.1", "Bytes": 1000, "Packets": 1}, + } + return entries +}