diff --git a/README.md b/README.md index 8472893b8..ae40a828c 100644 --- a/README.md +++ b/README.md @@ -463,7 +463,7 @@ It is possible to define aggregates per `srcIP` or per `dstIP` of per the tuple to capture the `sum`, `min`, `avg` etc. of the values in the field `value`. For example, configuration record for aggregating field `value` as -average for `srcIP`x`dstIP` tuples will look like this:: +average for `srcIP`x`dstIP` tuples will look like this: ```yaml pipeline: @@ -482,6 +482,31 @@ parameters: RecordKey: "value" ``` +The output fields of the aggregates stage are: +- `name` +- `operation` +- `record_key` +- `by` +- `aggregate` +- `total_value`: the total aggregate value +- `total_count`: the total count +- `recent_raw_values`: a slice with the raw values of the recent batch +- `recent_op_value`: the aggregate value of the recent batch +- `recent_count`: the count of flowlogs in the recent batch + +These fields are used by the next stage (for example `prom` encoder). +The pipeline processes flowlogs in batches. +The output fields with `recent_` prefix are related to the recent batch. +They are needed when exposing metrics in Prometheus using Counters and Histograms. +Prometheus Counters API accepts the delta amount to be added to the counter and not the total value as in Gauges. +In this case, `recent_op_value` and `recent_count` should be used as the `valuekey`. +The API of Histograms accepts the sample value, so it could be added to the appropriate bucket. +In this case, we are interested in the raw values of the records in the aggregation group. +No aggregate operation is needed and it should be set `raw_values`. The `valuekey` should be set to `recent_raw_values`. + +**Note**: `recent_raw_values` is filled only when the operation is `raw_values`. + + ### Prometheus encoder The prometheus encoder specifies which metrics to export to prometheus and which labels should be associated with those metrics. diff --git a/docs/api.md b/docs/api.md index 71cfc3060..21a4de150 100644 --- a/docs/api.md +++ b/docs/api.md @@ -133,6 +133,6 @@ Following is the supported API format for specifying metrics aggregations: aggregates: Name: description of aggregation result By: list of fields on which to aggregate - Operation: sum, min, max, or avg + Operation: sum, min, max, avg or raw_values RecordKey: internal field on which to perform the operation \ No newline at end of file diff --git a/docs/confGenerator.md b/docs/confGenerator.md index eb38b6815..9ad49da64 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -134,15 +134,17 @@ this actually moves the data from being log lines into being a metric named (8.2 > For additional details on `extract aggregates` > refer to [README.md](../README.md#aggregates). -(9) Next, the metrics from (8.2) are sent to prometheus (9.1). +(9) Next, the metrics from (8.2) are sent to prometheus (9.1).
The metric name in prometheus will be called as the value of (9.2) with -the prefix from the `config.yaml` file. -The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram). -The filter field (9.4) determines which aggregates will take into account. -The key should be `"name"` and the value should match the aggregate name (8.2) -The value to be used by prometheus is taken from the field defined in (9.5). -For `Gauges`, use `total_value` or `total_count`. For `Counters`, use `recent_op_value` or `recent_count`. -Prometheus will add labels to the metric based on the (9.6) fields. +the prefix from the `config.yaml` file.
+The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram).
+The filter field (9.4) determines which aggregates will be taken into account.
+The key should be `"name"` and the value should match the aggregate name (8.2).
+The value to be used by prometheus is taken from the field defined in (9.5).
+For `Gauge`, use `total_value` or `total_count`.
+For `Counter`, use `recent_op_value` or `recent_count`.
+For `Histogram`, use `recent_raw_values`.
+Prometheus will add labels to the metric based on the (9.6) fields.
(10) next, using grafana to visualize the metric with name from (9.2) including the prefix and using the prometheus expression from (10.1). diff --git a/pkg/api/extract_aggregate.go b/pkg/api/extract_aggregate.go index fac8d2b3c..62f685f07 100644 --- a/pkg/api/extract_aggregate.go +++ b/pkg/api/extract_aggregate.go @@ -6,6 +6,6 @@ type AggregateOperation string type AggregateDefinition struct { Name string `yaml:"Name" doc:"description of aggregation result"` By AggregateBy `yaml:"By" doc:"list of fields on which to aggregate"` - Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, or avg"` + Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, avg or raw_values"` RecordKey string `yaml:"RecordKey" doc:"internal field on which to perform the operation"` } diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index bb3d68af9..188edeba7 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap { +func createEncodeOutput(name string, labels map[string]string, value interface{}) config.GenericMap { gm := config.GenericMap{ "Name": name, "Labels": labels, @@ -62,6 +62,12 @@ parameters: - service operation: count recordkey: + + - name: bandwidth_raw_values + by: + - service + operation: raw_values + recordkey: bytes - name: encode encode: type: prom @@ -84,11 +90,12 @@ parameters: labels: - service -# - name: bytes_histogram -# type: histogram -# valuekey: recentRawValues -# labels: -# - service + - name: bytes_histogram + type: histogram + filter: {key: name, value: bandwidth_raw_values} + valuekey: recent_raw_values + labels: + - service ` var err error @@ -117,19 +124,20 @@ parameters: {"service": "tcp", "bytes": 2.0}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, nil, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 2, []float64{10, 20}, 0, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 2, []float64{1, 2}, 0, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), - // TODO: add the following test once raw_values operation and filters are implemented - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3.0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), }, }, { @@ -140,18 +148,20 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 3, []float64{30}, 0, 1), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 4, []float64{4, 5}, 0, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9.0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), }, }, } diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f3ab97797..47aff531a 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "os" - "strconv" "sync" "time" @@ -62,7 +61,6 @@ type entrySignature struct { type entryInfo struct { eInfo entrySignature - value float64 } type metricCacheEntry struct { @@ -105,7 +103,6 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord) - // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { val, keyFound := metricRecord[mInfo.filter.key] @@ -119,13 +116,6 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener log.Errorf("field %v is missing", mInfo.input) continue } - metricValueString := fmt.Sprintf("%v", metricValue) - valueFloat, err := strconv.ParseFloat(metricValueString, 64) - if err != nil { - log.Debugf("field cannot be converted to float: %v, %s", metricValue, metricValueString) - continue - } - log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat) entryLabels := make(map[string]string, len(mInfo.labelNames)) for _, t := range mInfo.labelNames { entryLabels[t] = fmt.Sprintf("%v", metricRecord[t]) @@ -135,32 +125,47 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener Name: e.prefix + metricName, Labels: entryLabels, }, - value: valueFloat, - } - entryMap := map[string]interface{}{ - // TODO: change to lower case - "Name": e.prefix + metricName, - "Labels": entryLabels, - "value": valueFloat, } - out = append(out, entryMap) cEntry := e.saveEntryInCache(entry, entryLabels) cEntry.PromMetric.metricType = mInfo.PromMetric.metricType // push the metric record to prometheus switch mInfo.PromMetric.metricType { case api.PromEncodeOperationName("Gauge"): - mInfo.promGauge.With(entryLabels).Set(valueFloat) + metricValueFloat, err := utils.ConvertToFloat64(metricValue) + if err != nil { + log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue) + continue + } + mInfo.promGauge.With(entryLabels).Set(metricValueFloat) cEntry.PromMetric.promGauge = mInfo.promGauge case api.PromEncodeOperationName("Counter"): - mInfo.promCounter.With(entryLabels).Add(valueFloat) + metricValueFloat, err := utils.ConvertToFloat64(metricValue) + if err != nil { + log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue) + continue + } + mInfo.promCounter.With(entryLabels).Add(metricValueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - for _, v := range metricRecord["recentRawValues"].([]float64) { + metricValueSlice, ok := metricValue.([]float64) + if !ok { + log.Errorf("value is not []float64. metric: %v, key: %v, value: %v", metricName, mInfo.input, metricValue) + continue + } + for _, v := range metricValueSlice { mInfo.promHist.With(entryLabels).Observe(v) } cEntry.PromMetric.promHist = mInfo.promHist } + + entryMap := map[string]interface{}{ + // TODO: change to lower case + "Name": e.prefix + metricName, + "Labels": entryLabels, + "value": metricValue, + } + out = append(out, entryMap) } return out } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index f25d786e3..631e388d9 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -108,19 +108,19 @@ func Test_NewEncodeProm(t *testing.T) { gEntryInfo1 := config.GenericMap{ "Name": "test_Bytes", "Labels": entryLabels1, - "value": float64(1234), + "value": 1234, } gEntryInfo2 := config.GenericMap{ "Name": "test_Packets", "Labels": entryLabels2, - "value": float64(34), + "value": 34, } require.Contains(t, output, gEntryInfo1) require.Contains(t, output, gEntryInfo2) gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1) require.Equal(t, nil, err) bytesA := testutil.ToFloat64(gaugeA) - require.Equal(t, gEntryInfo1["value"], bytesA) + require.Equal(t, gEntryInfo1["value"], int(bytesA)) // verify entries are in cache; one for the gauge and one for the counter entriesMap := encodeProm.mCache @@ -148,14 +148,13 @@ func Test_NewEncodeProm(t *testing.T) { func Test_EncodeAggregate(t *testing.T) { metrics := []config.GenericMap{{ - "name": "test_aggregate", - "operation": "sum", - "record_key": "IP", - "by": "[dstIP srcIP]", - "aggregate": "20.0.0.2,10.0.0.1", - "value": "7", - "test_aggregate" + "_value": "7", - "count": "1", + "name": "test_aggregate", + "operation": "sum", + "record_key": "IP", + "by": "[dstIP srcIP]", + "aggregate": "20.0.0.2,10.0.0.1", + "value": 7.0, + "count": 1, }} newEncode := &encodeProm{ @@ -163,7 +162,7 @@ func Test_EncodeAggregate(t *testing.T) { prefix: "test_", metrics: map[string]metricInfo{ "gauge": { - input: "test_aggregate_value", + input: "value", filter: keyValuePair{ key: "name", value: "test_aggregate", diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 6cc32f524..35a24b968 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -30,11 +30,12 @@ import ( ) const ( - OperationSum = "sum" - OperationAvg = "avg" - OperationMax = "max" - OperationMin = "min" - OperationCount = "count" + OperationSum = "sum" + OperationAvg = "avg" + OperationMax = "max" + OperationMin = "min" + OperationCount = "count" + OperationRawValues = "raw_values" ) type Labels map[string]string @@ -99,12 +100,15 @@ func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, Normaliz return nil, normalizedValues } -func initValue(operation string) float64 { +func getInitValue(operation string) float64 { switch operation { case OperationSum, OperationAvg, OperationMax, OperationCount: return 0 case OperationMin: return math.MaxFloat64 + case OperationRawValues: + // Actually, in OperationRawValues the value is ignored. + return 0 default: log.Panicf("unkown operation %v", operation) return 0 @@ -115,9 +119,12 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu groupState, ok := aggregate.Groups[normalizedValues] if !ok { groupState = &GroupState{normalizedValues: normalizedValues} - initVal := initValue(string(aggregate.Definition.Operation)) + initVal := getInitValue(string(aggregate.Definition.Operation)) groupState.totalValue = initVal groupState.recentOpValue = initVal + if aggregate.Definition.Operation == OperationRawValues { + groupState.recentRawValues = make([]float64, 0) + } aggregate.Groups[normalizedValues] = groupState } @@ -128,14 +135,12 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu if operation == OperationCount { groupState.totalValue = float64(groupState.totalCount + 1) groupState.recentOpValue = float64(groupState.recentCount + 1) - groupState.recentRawValues = append(groupState.recentRawValues, 1) } else { if recordKey != "" { value, ok := entry[recordKey] if ok { valueString := fmt.Sprintf("%v", value) valueFloat64, _ := strconv.ParseFloat(valueString, 64) - groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) switch operation { case OperationSum: groupState.totalValue += valueFloat64 @@ -149,6 +154,8 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu case OperationAvg: groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) + case OperationRawValues: + groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) } } } @@ -184,23 +191,24 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { metrics = append(metrics, config.GenericMap{ - "name": aggregate.Definition.Name, - "operation": aggregate.Definition.Operation, - "record_key": aggregate.Definition.RecordKey, - "by": strings.Join(aggregate.Definition.By, ","), - "aggregate": string(group.normalizedValues), - "total_value": fmt.Sprintf("%f", group.totalValue), - "recentRawValues": group.recentRawValues, - "total_count": fmt.Sprintf("%d", group.totalCount), - "recent_op_value": group.recentOpValue, - "recent_count": group.recentCount, + "name": aggregate.Definition.Name, + "operation": aggregate.Definition.Operation, + "record_key": aggregate.Definition.RecordKey, + "by": strings.Join(aggregate.Definition.By, ","), + "aggregate": string(group.normalizedValues), + "total_value": fmt.Sprintf("%f", group.totalValue), + "total_count": fmt.Sprintf("%d", group.totalCount), + "recent_raw_values": group.recentRawValues, + "recent_op_value": group.recentOpValue, + "recent_count": group.recentCount, strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields - group.recentRawValues = make([]float64, 0) + if aggregate.Definition.Operation == OperationRawValues { + group.recentRawValues = make([]float64, 0) + } group.recentCount = 0 - initVal := initValue(string(aggregate.Definition.Operation)) - group.recentOpValue = initVal + group.recentOpValue = getInitValue(string(aggregate.Definition.Operation)) } return metrics diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 1648adf50..17c030b58 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -68,6 +68,12 @@ parameters: - service operation: avg recordkey: bytes + + - name: bandwidth_raw_values + by: + - service + operation: raw_values + recordkey: bytes ` var err error @@ -92,16 +98,18 @@ parameters: {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, nil, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, nil, 20, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, nil, 10, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, nil, 1, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, nil, 15, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, nil, 1.5, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 2, []float64{10, 20}, 0, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 2, []float64{1, 2}, 0, 2), }, }, { @@ -112,16 +120,18 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, nil, 5, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, nil, 4, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, nil, 4.5, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 3, []float64{30}, 0, 1), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 4, []float64{4, 5}, 0, 2), }, }, } diff --git a/pkg/pipeline/utils/convert.go b/pkg/pipeline/utils/convert.go new file mode 100644 index 000000000..e5feeeadf --- /dev/null +++ b/pkg/pipeline/utils/convert.go @@ -0,0 +1,49 @@ +package utils + +import ( + "fmt" + "math" + "reflect" + "strconv" +) + +var floatType = reflect.TypeOf(float64(0)) +var stringType = reflect.TypeOf("") + +// ConvertToFloat64 converts an unknown type to float +// Based on https://stackoverflow.com/a/20767884/2749989 +func ConvertToFloat64(unk interface{}) (float64, error) { + switch i := unk.(type) { + case float64: + return i, nil + case float32: + return float64(i), nil + case int64: + return float64(i), nil + case int32: + return float64(i), nil + case int: + return float64(i), nil + case uint64: + return float64(i), nil + case uint32: + return float64(i), nil + case uint: + return float64(i), nil + case string: + return strconv.ParseFloat(i, 64) + default: + v := reflect.ValueOf(unk) + v = reflect.Indirect(v) + if v.Type().ConvertibleTo(floatType) { + fv := v.Convert(floatType) + return fv.Float(), nil + } else if v.Type().ConvertibleTo(stringType) { + sv := v.Convert(stringType) + s := sv.String() + return strconv.ParseFloat(s, 64) + } else { + return math.NaN(), fmt.Errorf("Can't convert %v to float64", v.Type()) + } + } +} diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 8b8dc5878..c8cf35342 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -104,30 +104,29 @@ func InitConfig(t *testing.T, conf string) *viper.Viper { func GetExtractMockEntry() config.GenericMap { entry := config.GenericMap{ - "srcAddr": "10.1.2.3", - "dstAddr": "10.1.2.4", - "srcPort": "9001", - "dstPort": "39504", - "bytes": "1234", - "packets": "34", - "recentRawValues": []float64{1.1, 2.2}, + "srcAddr": "10.1.2.3", + "dstAddr": "10.1.2.4", + "srcPort": 9001, + "dstPort": 39504, + "bytes": 1234, + "packets": 34, } return entry } -func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { - valueString := fmt.Sprintf("%f", value) +func CreateMockAgg(name, recordKey, by, agg, op string, totalValue float64, totalCount int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { + valueString := fmt.Sprintf("%f", totalValue) return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), - "recent_op_value": recentOpValue, - "recent_count": recentCount, + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + "recent_raw_values": rrv, + "total_count": fmt.Sprintf("%v", totalCount), + "recent_op_value": recentOpValue, + "recent_count": recentCount, } }