Skip to content

Commit

Permalink
Made prom and health server url configurable (#351)
Browse files Browse the repository at this point in the history
Fixes #349

Signed-off-by: Kuldeep Singh Pal <tkuldeep.allen@gmail.com>

Signed-off-by: Kuldeep Singh Pal <tkuldeep.allen@gmail.com>
  • Loading branch information
tkuldeep committed Dec 19, 2022
1 parent cba2541 commit a88f7d6
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 11 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Usage:

Flags:
--config string config file (default is $HOME/.flowlogs-pipeline)
--health.address string Health server address (default "0.0.0.0")
--health.port string Health server port (default "8080")
-h, --help help for flowlogs-pipeline
--log-level string Log level: debug, info, warning, error (default "error")
Expand Down Expand Up @@ -728,6 +729,7 @@ parameters:
encode:
type: prom
prom:
address: 0.0.0.0
port: 9103
prefix: test_
metrics:
Expand Down
1 change: 1 addition & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func initFlags() {
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", fmt.Sprintf("config file (default is $HOME/%s)", defaultLogFileName))
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&opts.Health.Address, "health.address", "0.0.0.0", "Health server address")
rootCmd.PersistentFlags().StringVar(&opts.Health.Port, "health.port", "8080", "Health server port")
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
Expand Down
2 changes: 1 addition & 1 deletion contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ parameters:
- service
- _RecordType
buckets: []
address: 0.0.0.0
port: 9102
prefix: flp_
- name: write_loki
Expand All @@ -452,4 +453,3 @@ parameters:
url: http://loki.default.svc.cluster.local:3100
staticLabels:
job: flowlogs-pipeline

1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Following is the supported API format for prometheus encode:
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
buckets: histogram buckets
address: address to expose "/metrics" endpoint
port: port number to expose "/metrics" endpoint
prefix: prefix added to each metric name
expiryTime: seconds of no-flow to wait before deleting prometheus data item
Expand Down
1 change: 1 addition & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type PromTLSConf struct {

type PromEncode struct {
Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"address to expose \"/metrics\" endpoint"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
Expand Down
1 change: 1 addition & 0 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
}
if len(cg.promMetrics) > 0 {
metricsNode.EncodePrometheus("encode_prom", api.PromEncode{
Address: cg.config.Encode.Prom.Address,
Port: cg.config.Encode.Prom.Port,
Prefix: cg.config.Encode.Prom.Prefix,
Metrics: cg.promMetrics,
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type ConfigFileStruct struct {
}

type Health struct {
Port string
Address string
Port string
}

type Profile struct {
Expand Down
5 changes: 1 addition & 4 deletions pkg/operational/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
log "github.com/sirupsen/logrus"
)

const defaultServerHost = "0.0.0.0"

type Server struct {
handler healthcheck.Handler
Address string
Expand All @@ -45,8 +43,7 @@ func (hs *Server) Serve() {
func NewHealthServer(opts *config.Options, isAlive healthcheck.Check, isReady healthcheck.Check) *Server {

handler := healthcheck.NewHandler()
address := net.JoinHostPort(defaultServerHost, opts.Health.Port)

address := net.JoinHostPort(opts.Health.Address, opts.Health.Port)
handler.AddLivenessCheck("PipelineCheck", isAlive)
handler.AddReadinessCheck("PipelineCheck", isReady)

Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Debugf("histos = %v", histos)
log.Debugf("aggHistos = %v", aggHistos)

addr := fmt.Sprintf(":%v", cfg.Port)
// if value of address is empty, then by default it will take 0.0.0.0
addr := fmt.Sprintf("%s:%v", cfg.Address, cfg.Port)
log.Infof("startServer: addr = %s", addr)

w := &EncodeProm{
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestNewHealthServer(t *testing.T) {
type args struct {
pipeline Pipeline
port string
address string
}
type want struct {
statusCode int
Expand All @@ -46,15 +47,15 @@ func TestNewHealthServer(t *testing.T) {
args args
want want
}{
{name: "pipeline running", args: args{pipeline: Pipeline{IsRunning: true}, port: "7000"}, want: want{statusCode: 200}},
{name: "pipeline not running", args: args{pipeline: Pipeline{IsRunning: false}, port: "7001"}, want: want{statusCode: 503}},
{name: "pipeline running", args: args{pipeline: Pipeline{IsRunning: true}, port: "7000", address: "0.0.0.0"}, want: want{statusCode: 200}},
{name: "pipeline not running", args: args{pipeline: Pipeline{IsRunning: false}, port: "7001", address: "0.0.0.0"}, want: want{statusCode: 503}},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

opts := config.Options{Health: config.Health{Port: tt.args.port}}
expectedAddr := fmt.Sprintf("0.0.0.0:%s", opts.Health.Port)
opts := config.Options{Health: config.Health{Port: tt.args.port, Address: tt.args.address}}
expectedAddr := fmt.Sprintf("%s:%s", opts.Health.Address, opts.Health.Port)
server := operational.NewHealthServer(&opts, tt.args.pipeline.IsAlive, tt.args.pipeline.IsReady)
require.NotNil(t, server)
require.Equal(t, expectedAddr, server.Address)
Expand Down

0 comments on commit a88f7d6

Please sign in to comment.