Skip to content

Commit

Permalink
NETOBSERV-1772: allow prom label remapping (#698)
Browse files Browse the repository at this point in the history
* NETOBSERV-1772: allow prom label remapping

- New "remap" field in MetricItem API
- New struct to distinguish source labels (from flow logs) and target
  labels (to use in metrics)

* Add/fix tests
  • Loading branch information
jotak committed Sep 10, 2024
1 parent def93f6 commit f8f7471
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 19 deletions.
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Following is the supported API format for prometheus encode:
not_match_regex: the filter value must not match the provided regular expression
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
prefix: prefix added to each metric name
Expand Down Expand Up @@ -438,6 +439,7 @@ Following is the supported API format for writing metrics to an OpenTelemetry co
not_match_regex: the filter value must not match the provided regular expression
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
pushTimeInterval: how often should metrics be sent to collector:
Expand Down
1 change: 1 addition & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type MetricsItem struct {
Filters []MetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"`
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Remap map[string]string `yaml:"remap" json:"remap" doc:"optional remapping of labels"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"`
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func Test_RunShortConfGen(t *testing.T) {
Filters: []api.MetricsFilter{{Key: "K", Value: "V"}},
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Buckets: []float64{},
}},
}, out.Parameters[3].Encode.Prom)
Expand Down Expand Up @@ -232,6 +233,7 @@ func Test_RunConfGenNoAgg(t *testing.T) {
Filters: []api.MetricsFilter{{Key: "K", Value: "V"}},
ValueKey: "Bytes",
Labels: []string{"service"},
Remap: map[string]string{},
Buckets: []float64{},
}},
}, out.Parameters[2].Encode.Prom)
Expand Down Expand Up @@ -336,13 +338,15 @@ func Test_RunLongConfGen(t *testing.T) {
Filters: []api.MetricsFilter{{Key: "K", Value: "V"}},
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Buckets: []float64{},
}, {
Name: "test_histo",
Type: "agg_histogram",
Filters: []api.MetricsFilter{{Key: "K", Value: "V"}},
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Buckets: []float64{},
}},
}, out.Parameters[4].Encode.Prom)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func TestKafkaPromPipeline(t *testing.T) {
}},
ValueKey: "recent_count",
Labels: []string{"by", "aggregate"},
Remap: map[string]string{},
Buckets: []float64{},
}},
Prefix: "flp_",
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
13 changes: 6 additions & 7 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,25 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}

func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}

func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem,
plog.Debugf("Checking metric: %s", fullMetricName)
mInfo := CreateMetricInfo(apiItem)
if oldMetric, ok := store[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem.Labels, oldMetric.info.MetricsItem.Labels) {
if !reflect.DeepEqual(mInfo.TargetLabels(), oldMetric.info.TargetLabels()) {
plog.Debug("Changes detected in labels")
return true
}
Expand Down Expand Up @@ -257,9 +257,8 @@ func (e *EncodeProm) resetRegistry() {
for i := range e.cfg.Metrics {
mCfg := &e.cfg.Metrics[i]
fullMetricName := e.cfg.Prefix + mCfg.Name
labels := mCfg.Labels
plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, labels)
mInfo := CreateMetricInfo(mCfg)
plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, mInfo.TargetLabels())
var m prometheus.Collector
switch mCfg.Type {
case api.MetricCounter:
Expand Down
21 changes: 21 additions & 0 deletions pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ var variableExtractor, _ = regexp.Compile(`\$\(([^\)]+)\)`)
type MetricInfo struct {
*api.MetricsItem
FilterPredicates []Predicate
MappedLabels []MappedLabel
}

type MappedLabel struct {
Source string
Target string
}

func (m *MetricInfo) TargetLabels() []string {
var targetLabels []string
for _, l := range m.MappedLabels {
targetLabels = append(targetLabels, l.Target)
}
return targetLabels
}

func Presence(filter api.MetricsFilter) Predicate {
Expand Down Expand Up @@ -122,6 +136,13 @@ func CreateMetricInfo(def *api.MetricsItem) *MetricInfo {
mi := MetricInfo{
MetricsItem: def,
}
for _, l := range def.Labels {
ml := MappedLabel{Source: l, Target: l}
if as := def.Remap[l]; as != "" {
ml.Target = as
}
mi.MappedLabels = append(mi.MappedLabels, ml)
}
for _, f := range def.Filters {
mi.FilterPredicates = append(mi.FilterPredicates, filterToPredicate(f))
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,53 @@ func Test_MissingLabels(t *testing.T) {
require.Contains(t, exposed, `my_counter{namespace=""} 4`)
}

func Test_Remap(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"IP": "10.0.0.1",
"bytes": 7,
},
{
"namespace": "A",
"IP": "10.0.0.2",
"bytes": 1,
},
{
"namespace": "B",
"IP": "10.0.0.3",
"bytes": 4,
},
}
params := api.PromEncode{
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Type: "counter",
ValueKey: "bytes",
Labels: []string{"namespace", "IP"},
Remap: map[string]string{"IP": "ip"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `my_counter{ip="10.0.0.1",namespace="A"} 7`)
require.Contains(t, exposed, `my_counter{ip="10.0.0.2",namespace="A"} 1`)
require.Contains(t, exposed, `my_counter{ip="10.0.0.3",namespace="B"} 4`)
}

func buildFlow() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0." + strconv.Itoa(rand.Intn(20)),
Expand Down
19 changes: 10 additions & 9 deletions pkg/pipeline/encode/metrics_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow con
floatVal = floatVal / info.ValueScale
}

entryLabels, key := extractLabelsAndKey(flow, info.MetricsItem)
entryLabels, key := extractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
cacheEntry := mci.GetChacheEntry(entryLabels, mv)
ok := m.mCache.UpdateCacheEntry(key, cacheEntry)
Expand All @@ -204,7 +204,7 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c
return nil, nil
}

entryLabels, key := extractLabelsAndKey(flow, info.MetricsItem)
entryLabels, key := extractLabelsAndKey(flow, info)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
cacheEntry := mci.GetChacheEntry(entryLabels, mc)
ok = m.mCache.UpdateCacheEntry(key, cacheEntry)
Expand Down Expand Up @@ -233,17 +233,18 @@ func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *
return val
}

func extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.Labels))
func extractLabelsAndKey(flow config.GenericMap, info *MetricInfo) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.MappedLabels))
key := strings.Builder{}
key.WriteString(info.Name)
key.WriteRune('|')
for _, t := range info.Labels {
entryLabels[t] = ""
if v, ok := flow[t]; ok {
entryLabels[t] = utils.ConvertToString(v)
for _, t := range info.MappedLabels {
value := ""
if v, ok := flow[t.Source]; ok {
value = utils.ConvertToString(v)
}
key.WriteString(entryLabels[t])
entryLabels[t.Target] = value
key.WriteString(value)
key.WriteRune('|')
}
return entryLabels, key.String()
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,8 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
for i := range cfg.Metrics {
mCfg := &cfg.Metrics[i]
fullMetricName := cfg.Prefix + mCfg.Name
labels := mCfg.Labels
log.Debugf("fullMetricName = %v", fullMetricName)
log.Debugf("Labels = %v", labels)
log.Debugf("Labels = %v", mCfg.Labels)
mInfo := encode.CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.MetricCounter:
Expand Down

0 comments on commit f8f7471

Please sign in to comment.