Skip to content

Commit

Permalink
Time based TopK/BotK extract stage (#283)
Browse files Browse the repository at this point in the history
* added basic structures for timebased topk/botk

* separate files to handle Tables and Filters

* some refactoring

* added extract_timebased.go

* fixed typo from merge during rebase

* Work in Progress

* WIP

* added unit tests and debugged

* fixed rebase merge issues

* added more unit tests

* added unit test to obtain all entries

* addressed some review comments

* changed handling of time to time.Duration and time.Time

* removed BotK and used instead Reversed

* additional addressing of comments

* changed filter operation to enum

* simplified logic of test case

* added timebased-topk to confgen

* allowed different extract-timebased in each metrics-definition yaml

* fixed test to include added map entry

* checked for error on ConvertToFloat64

* remove TopK from aggregate
  • Loading branch information
KalmanMeth committed Sep 7, 2022
1 parent ba6bf34 commit c4e2f41
Show file tree
Hide file tree
Showing 23 changed files with 1,193 additions and 205 deletions.
24 changes: 22 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Following is the supported API format for prometheus encode:
gauge: single numerical value that can arbitrarily go up and down
counter: monotonically increasing counter whose value can only increase
histogram: counts samples in configurable buckets
filter: the criterion to filter entries by
agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage
filter: an optional criterion to filter entries by
key: the key to match and filter by
value: the value to match and filter by
valueKey: entry key from which to resolve metric value
Expand Down Expand Up @@ -172,7 +173,6 @@ Following is the supported API format for specifying metrics aggregations:
by: list of fields on which to aggregate
operation: sum, min, max, avg or raw_values
recordKey: internal field on which to perform the operation
topK: number of highest incidence to report (default - report all)
</pre>
## Connection tracking API
Following is the supported API format for specifying connection tracking:
Expand Down Expand Up @@ -203,4 +203,24 @@ Following is the supported API format for specifying connection tracking:
input: The input field to base the operation on. When omitted, 'name' is used
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
updateConnectionInterval: duration of time to wait between update reports of a connection
</pre>
## Time-based Filters API
Following is the supported API format for specifying metrics time-based filters:

<pre>
timebased:
rules: list of filter rules, each includes:
name: description of filter result
recordKey: internal field to index TopK
operation: (enum) sum, min, max, avg, last or diff
sum: set output field to sum of parameters fields in the time window
avg: set output field to average of parameters fields in the time window
min: set output field to minimum of parameters fields in the time window
max: set output field to maximum of parameters fields in the time window
last: set output field to last of parameters fields in the time window
diff: set output field to the difference of the first and last parameters fields in the time window
operationKey: internal field on which to perform the operation
topK: number of highest incidence to report (default - report all)
reversed: report lowest incidence instead of highest (default - false)
timeInterval: time duration of data to use to compute the metric
</pre>
7 changes: 7 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Type** | counter |


### encode_prom_errors
| **Name** | encode_prom_errors |
|:---|:---|
| **Description** | Total errors during metrics generation |
| **Type** | counter |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
StdoutType = "stdout"
LokiType = "loki"
AggregateType = "aggregates"
TimebasedType = "timebased"
PromType = "prom"
GenericType = "generic"
NetworkType = "network"
Expand Down Expand Up @@ -62,4 +63,5 @@ type API struct {
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"`
ExtractTimebased ExtractTimebased `yaml:"timebased" doc:"## Time-based Filters API\nFollowing is the supported API format for specifying metrics time-based filters:\n"`
}
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type enums struct {
ConnTrackOperationEnum ConnTrackOperationEnum
ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum
DecoderEnum DecoderEnum
FilterOperationEnum FilterOperationEnum
}

type enumNameCacheKey struct {
Expand Down
1 change: 0 additions & 1 deletion pkg/api/extract_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ type AggregateDefinition struct {
By AggregateBy `yaml:"by,omitempty" json:"by,omitempty" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"operation,omitempty" json:"operation,omitempty" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"`
}
45 changes: 45 additions & 0 deletions pkg/api/extract_timebased.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type FilterOperationEnum struct {
FilterOperationSum string `yaml:"sum" json:"sum" doc:"set output field to sum of parameters fields in the time window"`
FilterOperationAvg string `yaml:"avg" json:"avg" doc:"set output field to average of parameters fields in the time window"`
FilterOperationMin string `yaml:"min" json:"min" doc:"set output field to minimum of parameters fields in the time window"`
FilterOperationMax string `yaml:"max" json:"max" doc:"set output field to maximum of parameters fields in the time window"`
FilterOperationLast string `yaml:"last" json:"last" doc:"set output field to last of parameters fields in the time window"`
FilterOperationDiff string `yaml:"diff" json:"diff" doc:"set output field to the difference of the first and last parameters fields in the time window"`
}

func FilterOperationName(operation string) string {
return GetEnumName(FilterOperationEnum{}, operation)
}

type ExtractTimebased struct {
Rules []TimebasedFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

type TimebasedFilterRule struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of filter result"`
RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field to index TopK"`
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"FilterOperationEnum" doc:"sum, min, max, avg, last or diff"`
OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"`
Reversed bool `yaml:"reversed,omitempty" json:"reversed,omitempty" doc:"report lowest incidence instead of highest (default - false)"`
TimeInterval Duration `yaml:"timeInterval,omitempty" json:"timeInterval,omitempty" doc:"time duration of data to use to compute the metric"`
}
4 changes: 3 additions & 1 deletion pkg/confgen/confgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Definition struct {
Tags []string
TransformNetwork *api.TransformNetwork
AggregateDefinitions *aggregate.Definitions
ExtractTimebased *api.ExtractTimebased
PromEncode *api.PromEncode
Visualization *Visualization
}
Expand All @@ -54,6 +55,7 @@ type ConfGen struct {
config *Config
transformRules api.NetworkTransformRules
aggregateDefinitions aggregate.Definitions
timebasedTopKs api.ExtractTimebased
promMetrics api.PromMetricsItems
visualizations Visualizations
definitions Definitions
Expand Down Expand Up @@ -178,7 +180,7 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error {
}

// parse extract
definition.AggregateDefinitions, err = cg.parseExtract(&defFile.Extract)
definition.AggregateDefinitions, definition.ExtractTimebased, err = cg.parseExtract(&defFile.Extract)
if err != nil {
log.Debugf("parseExtract err: %v ", err)
return err
Expand Down
29 changes: 25 additions & 4 deletions pkg/confgen/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,48 @@ package confgen

import (
jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate"
log "github.com/sirupsen/logrus"
)

func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Definitions, error) {
func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Definitions, *api.ExtractTimebased, error) {
var jsoniterJson = jsoniter.ConfigCompatibleWithStandardLibrary
aggregateExtract := (*extract)["aggregates"]
b, err := jsoniterJson.Marshal(&aggregateExtract)
if err != nil {
log.Debugf("jsoniterJson.Marshal err: %v ", err)
return nil, err
return nil, nil, err
}

var jsonNetworkAggregate aggregate.Definitions
err = config.JsonUnmarshalStrict(b, &jsonNetworkAggregate)
if err != nil {
log.Debugf("Unmarshal aggregate.Definitions err: %v ", err)
return nil, err
return nil, nil, err
}

cg.aggregateDefinitions = append(cg.aggregateDefinitions, jsonNetworkAggregate...)
return &jsonNetworkAggregate, nil

timebasedExtract, ok := (*extract)["timebased"]
if !ok {
return &jsonNetworkAggregate, nil, nil
}
b, err = jsoniterJson.Marshal(&timebasedExtract)
if err != nil {
log.Debugf("jsoniterJson.Marshal err: %v ", err)
return nil, nil, err
}

var jsonTimebasedTopKs api.ExtractTimebased
err = config.JsonUnmarshalStrict(b, &jsonTimebasedTopKs)
if err != nil {
log.Debugf("Unmarshal api.ExtractTimebased err: %v ", err)
return nil, nil, err
}

cg.timebasedTopKs.Rules = append(cg.timebasedTopKs.Rules, jsonTimebasedTopKs.Rules...)

return &jsonNetworkAggregate, &jsonTimebasedTopKs, nil
}
7 changes: 7 additions & 0 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
metricsNode := forkedNode
if len(cg.aggregateDefinitions) > 0 {
metricsNode = metricsNode.Aggregate("extract_aggregate", cg.aggregateDefinitions)
if len(cg.timebasedTopKs.Rules) > 0 {
metricsNode = metricsNode.ExtractTimebased("extract_timebased", api.ExtractTimebased{
Rules: cg.timebasedTopKs.Rules,
})
}
}
if len(cg.promMetrics) > 0 {
metricsNode.EncodePrometheus("encode_prom", api.PromEncode{
Expand Down Expand Up @@ -78,6 +83,8 @@ func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam {
parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network)
case "extract_aggregate":
parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions)
case "extract_timebased":
parameters[i] = config.NewTimbasedParams("extract_timebased", cg.timebasedTopKs)
case "encode_prom":
parameters[i] = config.NewEncodePrometheusParams("encode_prom", api.PromEncode{
Metrics: cg.promMetrics,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Extract struct {
Type string `yaml:"type" json:"type"`
Aggregates []api.AggregateDefinition `yaml:"aggregates,omitempty" json:"aggregates,omitempty"`
ConnTrack *api.ConnTrack `yaml:"conntrack,omitempty" json:"conntrack,omitempty"`
Timebased *api.ExtractTimebased `yaml:"timebased,omitempty" json:"timebased,omitempty"`
}

type Encode struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefini
return b.next(name, NewAggregateParams(name, aggs))
}

// ExtractTimebased chains the current stage with a ExtractTimebased stage and returns that new stage
func (b *PipelineBuilderStage) ExtractTimebased(name string, tb api.ExtractTimebased) PipelineBuilderStage {
return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.TimebasedType, Timebased: &tb}})
}

// TransformGeneric chains the current stage with a TransformGeneric stage and returns that new stage
func (b *PipelineBuilderStage) TransformGeneric(name string, gen api.TransformGeneric) PipelineBuilderStage {
return b.next(name, NewTransformGenericParams(name, gen))
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/stage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func NewConnTrackParams(name string, ct api.ConnTrack) StageParam {
return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}
}

func NewTimbasedParams(name string, ct api.ExtractTimebased) StageParam {
return StageParam{Name: name, Extract: &Extract{Type: api.TimebasedType, Timebased: &ct}}
}

func NewEncodePrometheusParams(name string, prom api.PromEncode) StageParam {
return StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}}
}
Expand Down
75 changes: 0 additions & 75 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package aggregate

import (
"container/heap"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -235,83 +234,9 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
group.recentOpValue = getInitValue(string(aggregate.Definition.Operation))
})

if aggregate.Definition.TopK > 0 {
metrics = aggregate.computeTopK(metrics)
}

return metrics
}

func (aggregate Aggregate) Cleanup(entry interface{}) {
// nothing special to do in this callback function
}

// functions to manipulate a heap to generate TopK entries
// We need to implement the heap interface: Len(), Less(), Swap(), Push(), Pop()

type heapItem struct {
value float64
metrics *config.GenericMap
}

type topkHeap []heapItem

func (h topkHeap) Len() int {
return len(h)
}

func (h topkHeap) Less(i, j int) bool {
return h[i].value < h[j].value
}

func (h topkHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *topkHeap) Push(x interface{}) {
*h = append(*h, x.(heapItem))
}

func (h *topkHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

func (aggregate Aggregate) computeTopK(inputMetrics []config.GenericMap) []config.GenericMap {
// maintain a heap with k items, always dropping the lowest
// we will be left with the TopK items
var prevMin float64
prevMin = -math.MaxFloat64
topk := aggregate.Definition.TopK
h := &topkHeap{}
for index, metricMap := range inputMetrics {
val := metricMap["total_value"].(float64)
if val < prevMin {
continue
}
item := heapItem{
metrics: &inputMetrics[index],
value: val,
}
heap.Push(h, item)
if h.Len() > topk {
x := heap.Pop(h)
prevMin = x.(heapItem).value
}
}
log.Debugf("heap: %v", h)

// convert the remaining heap to a sorted array
result := make([]config.GenericMap, h.Len())
heapLen := h.Len()
for i := heapLen; i > 0; i-- {
poppedItem := heap.Pop(h).(heapItem)
log.Debugf("poppedItem: %v", poppedItem)
result[i-1] = *poppedItem.metrics
}
log.Debugf("topk items: %v", result)
return result
}
Loading

0 comments on commit c4e2f41

Please sign in to comment.