From f8f7471f1ea63f4f517ec8b63628a11d55bfcb61 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 10 Sep 2024 10:30:52 +0200 Subject: [PATCH] NETOBSERV-1772: allow prom label remapping (#698) * NETOBSERV-1772: allow prom label remapping - New "remap" field in MetricItem API - New struct to distinguish source labels (from flow logs) and target labels (to use in metrics) * Add/fix tests --- docs/api.md | 2 + pkg/api/encode_prom.go | 1 + pkg/confgen/confgen_test.go | 4 ++ pkg/config/pipeline_builder_test.go | 3 +- pkg/pipeline/encode/encode_prom.go | 13 +++-- pkg/pipeline/encode/encode_prom_metric.go | 21 +++++++++ pkg/pipeline/encode/encode_prom_test.go | 47 +++++++++++++++++++ pkg/pipeline/encode/metrics_common.go | 19 ++++---- .../opentelemetry/encode_otlpmetrics.go | 3 +- 9 files changed, 94 insertions(+), 19 deletions(-) diff --git a/docs/api.md b/docs/api.md index 624d1e910..13a192d43 100644 --- a/docs/api.md +++ b/docs/api.md @@ -29,6 +29,7 @@ Following is the supported API format for prometheus encode: not_match_regex: the filter value must not match the provided regular expression valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric + remap: optional remapping of labels buckets: histogram buckets valueScale: scale factor of the value (MetricVal := FlowVal / Scale) prefix: prefix added to each metric name @@ -438,6 +439,7 @@ Following is the supported API format for writing metrics to an OpenTelemetry co not_match_regex: the filter value must not match the provided regular expression valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric + remap: optional remapping of labels buckets: histogram buckets valueScale: scale factor of the value (MetricVal := FlowVal / Scale) pushTimeInterval: how often should metrics be sent to collector: diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index b17628cc7..97222b9f8 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -52,6 +52,7 @@ type MetricsItem struct { Filters []MetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"` ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` + Remap map[string]string `yaml:"remap" json:"remap" doc:"optional remapping of labels"` Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` } diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 53198219e..0c1762cf5 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -151,6 +151,7 @@ func Test_RunShortConfGen(t *testing.T) { Filters: []api.MetricsFilter{{Key: "K", Value: "V"}}, ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, + Remap: map[string]string{}, Buckets: []float64{}, }}, }, out.Parameters[3].Encode.Prom) @@ -232,6 +233,7 @@ func Test_RunConfGenNoAgg(t *testing.T) { Filters: []api.MetricsFilter{{Key: "K", Value: "V"}}, ValueKey: "Bytes", Labels: []string{"service"}, + Remap: map[string]string{}, Buckets: []float64{}, }}, }, out.Parameters[2].Encode.Prom) @@ -336,6 +338,7 @@ func Test_RunLongConfGen(t *testing.T) { Filters: []api.MetricsFilter{{Key: "K", Value: "V"}}, ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, + Remap: map[string]string{}, Buckets: []float64{}, }, { Name: "test_histo", @@ -343,6 +346,7 @@ func Test_RunLongConfGen(t *testing.T) { Filters: []api.MetricsFilter{{Key: "K", Value: "V"}}, ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, + Remap: map[string]string{}, Buckets: []float64{}, }}, }, out.Parameters[4].Encode.Prom) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 88d32ea51..8b10142fb 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -137,6 +137,7 @@ func TestKafkaPromPipeline(t *testing.T) { }}, ValueKey: "recent_count", Labels: []string{"by", "aggregate"}, + Remap: map[string]string{}, Buckets: []float64{}, }}, Prefix: "flp_", @@ -170,7 +171,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[4]) require.NoError(t, err) - require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b)) + require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b)) } func TestForkPipeline(t *testing.T) { diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 02996581d..f90d0be5e 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -115,25 +115,25 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { } func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddCounter(fullMetricName, counter, mInfo) return counter } func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { - gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddGauge(fullMetricName, gauge, mInfo) return gauge } func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { - histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddHist(fullMetricName, histogram, mInfo) return histogram } func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { - agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo) return agghistogram } @@ -181,7 +181,7 @@ func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, plog.Debugf("Checking metric: %s", fullMetricName) mInfo := CreateMetricInfo(apiItem) if oldMetric, ok := store[fullMetricName]; ok { - if !reflect.DeepEqual(mInfo.MetricsItem.Labels, oldMetric.info.MetricsItem.Labels) { + if !reflect.DeepEqual(mInfo.TargetLabels(), oldMetric.info.TargetLabels()) { plog.Debug("Changes detected in labels") return true } @@ -257,9 +257,8 @@ func (e *EncodeProm) resetRegistry() { for i := range e.cfg.Metrics { mCfg := &e.cfg.Metrics[i] fullMetricName := e.cfg.Prefix + mCfg.Name - labels := mCfg.Labels - plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, labels) mInfo := CreateMetricInfo(mCfg) + plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, mInfo.TargetLabels()) var m prometheus.Collector switch mCfg.Type { case api.MetricCounter: diff --git a/pkg/pipeline/encode/encode_prom_metric.go b/pkg/pipeline/encode/encode_prom_metric.go index 407e55155..d57112790 100644 --- a/pkg/pipeline/encode/encode_prom_metric.go +++ b/pkg/pipeline/encode/encode_prom_metric.go @@ -16,6 +16,20 @@ var variableExtractor, _ = regexp.Compile(`\$\(([^\)]+)\)`) type MetricInfo struct { *api.MetricsItem FilterPredicates []Predicate + MappedLabels []MappedLabel +} + +type MappedLabel struct { + Source string + Target string +} + +func (m *MetricInfo) TargetLabels() []string { + var targetLabels []string + for _, l := range m.MappedLabels { + targetLabels = append(targetLabels, l.Target) + } + return targetLabels } func Presence(filter api.MetricsFilter) Predicate { @@ -122,6 +136,13 @@ func CreateMetricInfo(def *api.MetricsItem) *MetricInfo { mi := MetricInfo{ MetricsItem: def, } + for _, l := range def.Labels { + ml := MappedLabel{Source: l, Target: l} + if as := def.Remap[l]; as != "" { + ml.Target = as + } + mi.MappedLabels = append(mi.MappedLabels, ml) + } for _, f := range def.Filters { mi.FilterPredicates = append(mi.FilterPredicates, filterToPredicate(f)) } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index f7b37636b..91266d619 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -569,6 +569,53 @@ func Test_MissingLabels(t *testing.T) { require.Contains(t, exposed, `my_counter{namespace=""} 4`) } +func Test_Remap(t *testing.T) { + metrics := []config.GenericMap{ + { + "namespace": "A", + "IP": "10.0.0.1", + "bytes": 7, + }, + { + "namespace": "A", + "IP": "10.0.0.2", + "bytes": 1, + }, + { + "namespace": "B", + "IP": "10.0.0.3", + "bytes": 4, + }, + } + params := api.PromEncode{ + ExpiryTime: api.Duration{ + Duration: time.Duration(60 * time.Second), + }, + Metrics: []api.MetricsItem{ + { + Name: "my_counter", + Type: "counter", + ValueKey: "bytes", + Labels: []string{"namespace", "IP"}, + Remap: map[string]string{"IP": "ip"}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t, encodeProm.server) + + require.Contains(t, exposed, `my_counter{ip="10.0.0.1",namespace="A"} 7`) + require.Contains(t, exposed, `my_counter{ip="10.0.0.2",namespace="A"} 1`) + require.Contains(t, exposed, `my_counter{ip="10.0.0.3",namespace="B"} 4`) +} + func buildFlow() config.GenericMap { return config.GenericMap{ "srcIP": "10.0.0." + strconv.Itoa(rand.Intn(20)), diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go index 4a14ba5e8..e45f09257 100644 --- a/pkg/pipeline/encode/metrics_common.go +++ b/pkg/pipeline/encode/metrics_common.go @@ -182,7 +182,7 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con floatVal = floatVal / info.ValueScale } - entryLabels, key := extractLabelsAndKey(flow, info.MetricsItem) + entryLabels, key := extractLabelsAndKey(flow, info) // Update entry for expiry mechanism (the entry itself is its own cleanup function) cacheEntry := mci.GetChacheEntry(entryLabels, mv) ok := m.mCache.UpdateCacheEntry(key, cacheEntry) @@ -204,7 +204,7 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c return nil, nil } - entryLabels, key := extractLabelsAndKey(flow, info.MetricsItem) + entryLabels, key := extractLabelsAndKey(flow, info) // Update entry for expiry mechanism (the entry itself is its own cleanup function) cacheEntry := mci.GetChacheEntry(entryLabels, mc) ok = m.mCache.UpdateCacheEntry(key, cacheEntry) @@ -233,17 +233,18 @@ func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info * return val } -func extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { - entryLabels := make(map[string]string, len(info.Labels)) +func extractLabelsAndKey(flow config.GenericMap, info *MetricInfo) (map[string]string, string) { + entryLabels := make(map[string]string, len(info.MappedLabels)) key := strings.Builder{} key.WriteString(info.Name) key.WriteRune('|') - for _, t := range info.Labels { - entryLabels[t] = "" - if v, ok := flow[t]; ok { - entryLabels[t] = utils.ConvertToString(v) + for _, t := range info.MappedLabels { + value := "" + if v, ok := flow[t.Source]; ok { + value = utils.ConvertToString(v) } - key.WriteString(entryLabels[t]) + entryLabels[t.Target] = value + key.WriteString(value) key.WriteRune('|') } return entryLabels, key.String() diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 9840f45d0..12edd1d98 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -133,9 +133,8 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar for i := range cfg.Metrics { mCfg := &cfg.Metrics[i] fullMetricName := cfg.Prefix + mCfg.Name - labels := mCfg.Labels log.Debugf("fullMetricName = %v", fullMetricName) - log.Debugf("Labels = %v", labels) + log.Debugf("Labels = %v", mCfg.Labels) mInfo := encode.CreateMetricInfo(mCfg) switch mCfg.Type { case api.MetricCounter: