Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make expiryTime in encode_prom into a Duration #411

Merged
merged 3 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Following is the supported API format for prometheus encode:
labels: labels to be associated with the metric
buckets: histogram buckets
prefix: prefix added to each metric name
expiryTime: seconds of no-flow to wait before deleting prometheus data item
expiryTime: time duration of no-flow to wait before deleting prometheus data item
maxMetrics: maximum number of metrics to report (default: unlimited)
</pre>
## Kafka encode API
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PromTLSConf struct {
type PromEncode struct {
Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting prometheus data item"`
MaxMetrics int `yaml:"maxMetrics,omitempty" json:"maxMetrics,omitempty" doc:"maximum number of metrics to report (default: unlimited)"`
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config
import (
"encoding/json"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -117,6 +118,8 @@ func TestKafkaPromPipeline(t *testing.T) {
GroupByKeys: api.AggregateBy{"srcAS"},
OperationType: "count",
}})
var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(50 * time.Second)
pl = pl.EncodePrometheus("prom", api.PromEncode{
Metrics: api.PromMetricsItems{{
Name: "connections_per_source_as",
Expand All @@ -129,7 +132,8 @@ func TestKafkaPromPipeline(t *testing.T) {
Labels: []string{"by", "aggregate"},
Buckets: []float64{},
}},
Prefix: "flp_",
Prefix: "flp_",
ExpiryTime: expiryTimeDuration,
})
stages := pl.GetStages()
require.Len(t, stages, 5)
Expand Down Expand Up @@ -159,7 +163,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ parameters:
type: prom
prom:
prefix: test_
expiryTime: 1
expiryTime: 1s
metrics:
- name: flow_count
type: counter
Expand Down
10 changes: 5 additions & 5 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = 2 * time.Minute
const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
Expand Down Expand Up @@ -267,9 +267,9 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
cfg = *params.Encode.Prom
}

expiryTime := time.Duration(cfg.ExpiryTime) * time.Second
if expiryTime == 0 {
expiryTime = defaultExpiryTime
expiryTime := cfg.ExpiryTime
if expiryTime.Duration == 0 {
expiryTime.Duration = defaultExpiryTime
}
log.Debugf("expiryTime = %v", expiryTime)

Expand Down Expand Up @@ -348,7 +348,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
gauges: gauges,
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime,
expiryTime: expiryTime.Duration,
mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric),
mChacheLenMetric: mChacheLenMetric,
exitChan: putils.ExitChannel(),
Expand Down
15 changes: 10 additions & 5 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ parameters:
type: prom
prom:
prefix: test_
expiryTime: 1
expiryTime: 1s
metrics:
- name: Bytes
type: gauge
Expand Down Expand Up @@ -146,10 +146,11 @@ func Test_CustomMetric(t *testing.T) {
"packets": 2,
"latency": 0.2,
}}

var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 60,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down Expand Up @@ -229,9 +230,11 @@ func Test_MetricTTL(t *testing.T) {
"bytes": 12,
}}

var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(1 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 1,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down Expand Up @@ -281,9 +284,11 @@ func hundredFlows() []config.GenericMap {
}

func BenchmarkPromEncode(b *testing.B) {
var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 60,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down