Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw values operation #152

Merged
merged 16 commits into from
Apr 3, 2022
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ It is possible to define aggregates per `srcIP` or per `dstIP` of per the tuple
to capture the `sum`, `min`, `avg` etc. of the values in the field `value`.

For example, configuration record for aggregating field `value` as
average for `srcIP`x`dstIP` tuples will look like this::
average for `srcIP`x`dstIP` tuples will look like this:

```yaml
pipeline:
Expand All @@ -482,6 +482,31 @@ parameters:
RecordKey: "value"
```

The output fields of the aggregates stage are:
- `name`
- `operation`
- `record_key`
- `by`
- `aggregate`
- `total_value`: the total aggregate value
- `total_count`: the total count
- `recent_raw_values`: a slice with the raw values of the recent batch
- `recent_op_value`: the aggregate value of the recent batch
- `recent_count`: the count of flowlogs in the recent batch

These fields are used by the next stage (for example `prom` encoder).
The pipeline processes flowlogs in batches.
The output fields with `recent_` prefix are related to the recent batch.
They are needed when exposing metrics in Prometheus using Counters and Histograms.
Prometheus Counters API accepts the delta amount to be added to the counter and not the total value as in Gauges.
In this case, `recent_op_value` and `recent_count` should be used as the `valuekey`.
The API of Histograms accepts the sample value, so it could be added to the appropriate bucket.
In this case, we are interested in the raw values of the records in the aggregation group.
No aggregate operation is needed and it should be set `raw_values`. The `valuekey` should be set to `recent_raw_values`.

**Note**: `recent_raw_values` is filled only when the operation is `raw_values`.


### Prometheus encoder

The prometheus encoder specifies which metrics to export to prometheus and which labels should be associated with those metrics.
Expand Down
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,6 @@ Following is the supported API format for specifying metrics aggregations:
aggregates:
Name: description of aggregation result
By: list of fields on which to aggregate
Operation: sum, min, max, or avg
Operation: sum, min, max, avg or raw_values
RecordKey: internal field on which to perform the operation
</pre>
18 changes: 10 additions & 8 deletions docs/confGenerator.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ this actually moves the data from being log lines into being a metric named (8.2
> For additional details on `extract aggregates`
> refer to [README.md](../README.md#aggregates).

(9) Next, the metrics from (8.2) are sent to prometheus (9.1).
(9) Next, the metrics from (8.2) are sent to prometheus (9.1). <br>
The metric name in prometheus will be called as the value of (9.2) with
the prefix from the `config.yaml` file.
The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram).
The filter field (9.4) determines which aggregates will take into account.
The key should be `"name"` and the value should match the aggregate name (8.2)
The value to be used by prometheus is taken from the field defined in (9.5).
For `Gauges`, use `total_value` or `total_count`. For `Counters`, use `recent_op_value` or `recent_count`.
Prometheus will add labels to the metric based on the (9.6) fields.
the prefix from the `config.yaml` file. <br>
The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram). <br>
The filter field (9.4) determines which aggregates will be taken into account. <br>
The key should be `"name"` and the value should match the aggregate name (8.2). <br>
The value to be used by prometheus is taken from the field defined in (9.5). <br>
For `Gauge`, use `total_value` or `total_count`. <br>
For `Counter`, use `recent_op_value` or `recent_count`. <br>
For `Histogram`, use `recent_raw_values`. <br>
Prometheus will add labels to the metric based on the (9.6) fields. <br>

(10) next, using grafana to visualize the metric with name from (9.2) including the
prefix and using the prometheus expression from (10.1).
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/extract_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ type AggregateOperation string
type AggregateDefinition struct {
Name string `yaml:"Name" doc:"description of aggregation result"`
By AggregateBy `yaml:"By" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, or avg"`
Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"RecordKey" doc:"internal field on which to perform the operation"`
}
56 changes: 33 additions & 23 deletions pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require"
)

func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap {
func createEncodeOutput(name string, labels map[string]string, value interface{}) config.GenericMap {
gm := config.GenericMap{
"Name": name,
"Labels": labels,
Expand Down Expand Up @@ -62,6 +62,12 @@ parameters:
- service
operation: count
recordkey:

- name: bandwidth_raw_values
by:
- service
operation: raw_values
recordkey: bytes
- name: encode
encode:
type: prom
Expand All @@ -84,11 +90,12 @@ parameters:
labels:
- service

# - name: bytes_histogram
# type: histogram
# valuekey: recentRawValues
# labels:
# - service
- name: bytes_histogram
type: histogram
filter: {key: name, value: bandwidth_raw_values}
valuekey: recent_raw_values
labels:
- service
`
var err error

Expand Down Expand Up @@ -117,19 +124,20 @@ parameters:
{"service": "tcp", "bytes": 2.0},
},
expectedAggs: []config.GenericMap{
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, nil, 30, 2),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2),
test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 2, []float64{10, 20}, 0, 2),
test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 2, []float64{1, 2}, 0, 2),
},
expectedEncode: []config.GenericMap{
createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2),
createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3),
// TODO: add the following test once raw_values operation and filters are implemented
//createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}),
//createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3.0),
createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}),
createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}),
},
},
{
Expand All @@ -140,18 +148,20 @@ parameters:
{"service": "tcp", "bytes": 5},
},
expectedAggs: []config.GenericMap{
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1),
test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, nil, 30, 1),
test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2),
test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1),
test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2),
test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 3, []float64{30}, 0, 1),
test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 4, []float64{4, 5}, 0, 2),
},
expectedEncode: []config.GenericMap{
createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1),
createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9),
//createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}),
//createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0),
createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9.0),
createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}),
createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}),
},
},
}
Expand Down
47 changes: 26 additions & 21 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -62,7 +61,6 @@ type entrySignature struct {

type entryInfo struct {
eInfo entrySignature
value float64
}

type metricCacheEntry struct {
Expand Down Expand Up @@ -105,7 +103,6 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap {

func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap {
log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord)
// TODO: We may need different handling for histograms
out := make([]config.GenericMap, 0)
for metricName, mInfo := range e.metrics {
val, keyFound := metricRecord[mInfo.filter.key]
Expand All @@ -119,13 +116,6 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener
log.Errorf("field %v is missing", mInfo.input)
continue
}
metricValueString := fmt.Sprintf("%v", metricValue)
valueFloat, err := strconv.ParseFloat(metricValueString, 64)
if err != nil {
log.Debugf("field cannot be converted to float: %v, %s", metricValue, metricValueString)
continue
}
log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat)
entryLabels := make(map[string]string, len(mInfo.labelNames))
for _, t := range mInfo.labelNames {
entryLabels[t] = fmt.Sprintf("%v", metricRecord[t])
Expand All @@ -135,32 +125,47 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener
Name: e.prefix + metricName,
Labels: entryLabels,
},
value: valueFloat,
}
entryMap := map[string]interface{}{
// TODO: change to lower case
"Name": e.prefix + metricName,
"Labels": entryLabels,
"value": valueFloat,
}
out = append(out, entryMap)

cEntry := e.saveEntryInCache(entry, entryLabels)
cEntry.PromMetric.metricType = mInfo.PromMetric.metricType
// push the metric record to prometheus
switch mInfo.PromMetric.metricType {
case api.PromEncodeOperationName("Gauge"):
mInfo.promGauge.With(entryLabels).Set(valueFloat)
metricValueFloat, err := utils.ConvertToFloat64(metricValue)
if err != nil {
log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue)
continue
}
mInfo.promGauge.With(entryLabels).Set(metricValueFloat)
cEntry.PromMetric.promGauge = mInfo.promGauge
case api.PromEncodeOperationName("Counter"):
mInfo.promCounter.With(entryLabels).Add(valueFloat)
metricValueFloat, err := utils.ConvertToFloat64(metricValue)
if err != nil {
log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue)
continue
}
mInfo.promCounter.With(entryLabels).Add(metricValueFloat)
cEntry.PromMetric.promCounter = mInfo.promCounter
case api.PromEncodeOperationName("Histogram"):
for _, v := range metricRecord["recentRawValues"].([]float64) {
metricValueSlice, ok := metricValue.([]float64)
if !ok {
log.Errorf("value is not []float64. metric: %v, key: %v, value: %v", metricName, mInfo.input, metricValue)
continue
}
for _, v := range metricValueSlice {
mInfo.promHist.With(entryLabels).Observe(v)
}
cEntry.PromMetric.promHist = mInfo.promHist
}

entryMap := map[string]interface{}{
// TODO: change to lower case
"Name": e.prefix + metricName,
"Labels": entryLabels,
"value": metricValue,
}
out = append(out, entryMap)
}
return out
}
Expand Down
23 changes: 11 additions & 12 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ func Test_NewEncodeProm(t *testing.T) {
gEntryInfo1 := config.GenericMap{
"Name": "test_Bytes",
"Labels": entryLabels1,
"value": float64(1234),
"value": 1234,
}
gEntryInfo2 := config.GenericMap{
"Name": "test_Packets",
"Labels": entryLabels2,
"value": float64(34),
"value": 34,
}
require.Contains(t, output, gEntryInfo1)
require.Contains(t, output, gEntryInfo2)
gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1)
require.Equal(t, nil, err)
bytesA := testutil.ToFloat64(gaugeA)
require.Equal(t, gEntryInfo1["value"], bytesA)
require.Equal(t, gEntryInfo1["value"], int(bytesA))

// verify entries are in cache; one for the gauge and one for the counter
entriesMap := encodeProm.mCache
Expand Down Expand Up @@ -148,22 +148,21 @@ func Test_NewEncodeProm(t *testing.T) {

func Test_EncodeAggregate(t *testing.T) {
metrics := []config.GenericMap{{
"name": "test_aggregate",
"operation": "sum",
"record_key": "IP",
"by": "[dstIP srcIP]",
"aggregate": "20.0.0.2,10.0.0.1",
"value": "7",
"test_aggregate" + "_value": "7",
"count": "1",
"name": "test_aggregate",
"operation": "sum",
"record_key": "IP",
"by": "[dstIP srcIP]",
"aggregate": "20.0.0.2,10.0.0.1",
"value": 7.0,
"count": 1,
}}

newEncode := &encodeProm{
port: ":0000",
prefix: "test_",
metrics: map[string]metricInfo{
"gauge": {
input: "test_aggregate_value",
input: "value",
filter: keyValuePair{
key: "name",
value: "test_aggregate",
Expand Down
Loading