diff --git a/README.md b/README.md index de8f3cda4..7161d09ab 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ Usage: Flags: --config string config file (default is $HOME/.flowlogs-pipeline) + --health.address string Health server address (default "0.0.0.0") --health.port string Health server port (default "8080") -h, --help help for flowlogs-pipeline --log-level string Log level: debug, info, warning, error (default "error") @@ -728,6 +729,7 @@ parameters: encode: type: prom prom: + address: 0.0.0.0 port: 9103 prefix: test_ metrics: diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index d07e4c748..6a794b01b 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -137,6 +137,7 @@ func initFlags() { cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName)) rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error") + rootCmd.PersistentFlags().StringVar(&opts.Health.Address, "health.address", "0.0.0.0", "Health server address") rootCmd.PersistentFlags().StringVar(&opts.Health.Port, "health.port", "8080", "Health server port") rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)") rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field") diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 51ef4f378..9b88caebb 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -443,6 +443,7 @@ parameters: - service - _RecordType buckets: [] + address: 0.0.0.0 port: 9102 prefix: flp_ - name: write_loki @@ -452,4 +453,3 @@ parameters: url: http://loki.default.svc.cluster.local:3100 staticLabels: job: flowlogs-pipeline - diff --git a/docs/api.md b/docs/api.md index 1613993a0..97176db41 100644 --- a/docs/api.md +++ b/docs/api.md @@ -17,6 +17,7 @@ Following is the supported API format for prometheus encode: valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric buckets: histogram buckets + address: address to expose "/metrics" endpoint port: port number to expose "/metrics" endpoint prefix: prefix added to each metric name expiryTime: seconds of no-flow to wait before deleting prometheus data item diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 705e3e6be..6154813b3 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -24,6 +24,7 @@ type PromTLSConf struct { type PromEncode struct { Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"` + Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"address to expose \"/metrics\" endpoint"` Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"` Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"` ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"` diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 0c62ec935..a75411d3c 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -56,6 +56,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { } if len(cg.promMetrics) > 0 { metricsNode.EncodePrometheus("encode_prom", api.PromEncode{ + Address: cg.config.Encode.Prom.Address, Port: cg.config.Encode.Prom.Port, Prefix: cg.config.Encode.Prom.Prefix, Metrics: cg.promMetrics, diff --git a/pkg/config/config.go b/pkg/config/config.go index 7b7fcab04..39fc2ae7c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -43,7 +43,8 @@ type ConfigFileStruct struct { } type Health struct { - Port string + Address string + Port string } type Profile struct { diff --git a/pkg/operational/health.go b/pkg/operational/health.go index 18895b7fc..a252ebc46 100644 --- a/pkg/operational/health.go +++ b/pkg/operational/health.go @@ -27,8 +27,6 @@ import ( log "github.com/sirupsen/logrus" ) -const defaultServerHost = "0.0.0.0" - type Server struct { handler healthcheck.Handler Address string @@ -45,8 +43,7 @@ func (hs *Server) Serve() { func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *Server { handler := healthcheck.NewHandler() - address := net.JoinHostPort(defaultServerHost, opts.Health.Port) - + address := net.JoinHostPort(opts.Health.Address, opts.Health.Port) handler.AddLivenessCheck("PipelineCheck", isAlive) handler.AddReadinessCheck("PipelineCheck", isReady) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index a57f82d92..aea1e8a29 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -348,7 +348,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Debugf("histos = %v", histos) log.Debugf("aggHistos = %v", aggHistos) - addr := fmt.Sprintf(":%v", cfg.Port) + // if value of address is empty, then by default it will take 0.0.0.0 + addr := fmt.Sprintf("%s:%v", cfg.Address, cfg.Port) log.Infof("startServer: addr = %s", addr) w := &EncodeProm{ diff --git a/pkg/pipeline/health_test.go b/pkg/pipeline/health_test.go index 69a1da6f5..54bd6a649 100644 --- a/pkg/pipeline/health_test.go +++ b/pkg/pipeline/health_test.go @@ -36,6 +36,7 @@ func TestNewHealthServer(t *testing.T) { type args struct { pipeline Pipeline port string + address string } type want struct { statusCode int @@ -46,15 +47,15 @@ func TestNewHealthServer(t *testing.T) { args args want want }{ - {name: "pipeline running", args: args{pipeline: Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}}, - {name: "pipeline not running", args: args{pipeline: Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}}, + {name: "pipeline running", args: args{pipeline: Pipeline{IsRunning: true}, port: "7000", address: "0.0.0.0"}, want: want{statusCode: 200}}, + {name: "pipeline not running", args: args{pipeline: Pipeline{IsRunning: false}, port: "7001", address: "0.0.0.0"}, want: want{statusCode: 503}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - opts := config.Options{Health: config.Health{Port: tt.args.port}} - expectedAddr := fmt.Sprintf("0.0.0.0:%s", opts.Health.Port) + opts := config.Options{Health: config.Health{Port: tt.args.port, Address: tt.args.address}} + expectedAddr := fmt.Sprintf("%s:%s", opts.Health.Address, opts.Health.Port) server := operational.NewHealthServer(&opts, tt.args.pipeline.IsAlive, tt.args.pipeline.IsReady) require.NotNil(t, server) require.Equal(t, expectedAddr, server.Address)