diff --git a/README.md b/README.md index c65408f..2a41abc 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,10 @@ nats_subject_monitor = "influx-spout-monitor" # The listener will serve Kubernetes liveness and readiness probes on this port # at /healthz and /readyz. Set to 0 (the default) to disable probes support. probe_port = 0 + +# The listener will serve Go pprof requests at this port. Set to 0 (the +default) to disable pprof support. +pprof_port = 0 ``` ### HTTP Listener @@ -187,6 +191,10 @@ nats_subject_monitor = "influx-spout-monitor" # The HTTP listener will serve Kubernetes liveness and readiness probes on this # port at /healthz and /readyz. Set to 0 (the default) to disable probes support. probe_port = 0 + +# The HTTP listener will serve Go pprof requests at this port. Set to 0 (the +default) to disable pprof support. +pprof_port = 0 ``` ### Filter @@ -228,6 +236,10 @@ workers = 8 # /healthz and /readyz. Set to 0 (the default) to disable probes support. probe_port = 0 +# The filter will serve Go pprof requests at this port. Set to 0 (the default) +# to disable pprof support. +pprof_port = 0 + # Incoming metrics with timestamps ± this value from the current time will be # rejected. Metrics with timestamps that are significantly different from previously # written timestamps negatively impact InfluxDB performance. @@ -340,6 +352,10 @@ nats_subject_monitor = "influx-spout-monitor" # The writer will serve Kubernetes liveness and readiness probes on this port at # /healthz and /readyz. Set to 0 (the default) to disable probes support. probe_port = 0 + +# The writer will serve Go pprof requests at this port. Set to 0 (the default) +# to disable pprof support. +pprof_port = 0 ``` A writer will batch up messages until one of the limits defined by the @@ -376,6 +392,10 @@ port = 9331 # The monitor will serve Kubernetes liveness and readiness probes on this port # at /healthz and /readyz. Set to 0 (the default) to disable probes support. probe_port = 0 + +# The monitor will serve Go pprof requests at this port. Set to 0 (the default) +# to disable pprof support. +pprof_port = 0 ``` ## Running tests diff --git a/cmd/run.go b/cmd/run.go index 9e3efbd..9e9fc85 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -16,8 +16,13 @@ package cmd import ( "fmt" + "log" + "net/http" "runtime" + // Profiling support + _ "net/http/pprof" + "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/filter" "github.com/jumptrading/influx-spout/listener" @@ -38,6 +43,16 @@ func Run(configFile string) (out Stoppable, err error) { return nil, fmt.Errorf("Error while loading config file: %v", err) } + if c.PprofPort > 0 { + go func() { + log.Printf("starting pprof listener on port %d", c.PprofPort) + err := http.ListenAndServe(fmt.Sprintf(":%d", c.PprofPort), nil) + if err != nil { + log.Printf("pprof listener exited: %v", err) + } + }() + } + switch c.Mode { case "listener": out, err = listener.StartListener(c) diff --git a/config/config.go b/config/config.go index 2ea3131..c6d0293 100644 --- a/config/config.go +++ b/config/config.go @@ -52,6 +52,7 @@ type Config struct { Rule []Rule `toml:"rule"` MaxTimeDeltaSecs int `toml:"max_time_delta_secs"` ProbePort int `toml:"probe_port"` + PprofPort int `toml:"pprof_port"` Debug bool `toml:"debug"` } @@ -81,6 +82,7 @@ func newDefaultConfig() *Config { ListenerBatchBytes: 1024 * 1024, MaxTimeDeltaSecs: 600, ProbePort: 0, + PprofPort: 0, } } diff --git a/config/config_small_test.go b/config/config_small_test.go index 6301cf0..32f674a 100644 --- a/config/config_small_test.go +++ b/config/config_small_test.go @@ -57,6 +57,7 @@ listener_batch_bytes = 4096 max_time_delta_secs = 789 probe_port = 6789 +pprof_port = 5432 ` conf, err := parseConfig(validConfigSample) require.NoError(t, err, "Couldn't parse a valid config: %v\n", err) @@ -83,6 +84,7 @@ probe_port = 6789 assert.Equal(t, "nats://localhost:4222", conf.NATSAddress, "Address must match") assert.Equal(t, 6789, conf.ProbePort) + assert.Equal(t, 5432, conf.PprofPort) } func TestAllDefaults(t *testing.T) { @@ -109,6 +111,7 @@ func TestAllDefaults(t *testing.T) { assert.Equal(t, 1048576, conf.ListenerBatchBytes) assert.Equal(t, 600, conf.MaxTimeDeltaSecs) assert.Equal(t, 0, conf.ProbePort) + assert.Equal(t, 0, conf.PprofPort) assert.Equal(t, false, conf.Debug) assert.Len(t, conf.Rule, 0) } diff --git a/writer/writer.go b/writer/writer.go index e345211..bf05ae0 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -27,7 +27,6 @@ import ( "time" "net/http" - _ "net/http/pprof" // for profiling a nasty memleak "github.com/nats-io/go-nats" @@ -77,8 +76,6 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { } }() - go http.ListenAndServe(":8080", nil) // for pprof profiling - w.rules, err = filter.RuleSetFromConfig(c) if err != nil { return nil, err