Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Implement pprof support
Browse files Browse the repository at this point in the history
Instead of having pprof support hardcoded into just the writer, allow
the port to be specified via the configuration.
  • Loading branch information
mjs committed Jun 4, 2018
1 parent 3616d83 commit 3fd57d9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 3 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ nats_subject_monitor = "influx-spout-monitor"
# The listener will serve Kubernetes liveness and readiness probes on this port
# at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# The listener will serve Go pprof requests at this port. Set to 0 (the
default) to disable pprof support.
pprof_port = 0
```

### HTTP Listener
Expand Down Expand Up @@ -187,6 +191,10 @@ nats_subject_monitor = "influx-spout-monitor"
# The HTTP listener will serve Kubernetes liveness and readiness probes on this
# port at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# The HTTP listener will serve Go pprof requests at this port. Set to 0 (the
default) to disable pprof support.
pprof_port = 0
```

### Filter
Expand Down Expand Up @@ -228,6 +236,10 @@ workers = 8
# /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# The filter will serve Go pprof requests at this port. Set to 0 (the default)
# to disable pprof support.
pprof_port = 0

# Incoming metrics with timestamps ± this value from the current time will be
# rejected. Metrics with timestamps that are significantly different from previously
# written timestamps negatively impact InfluxDB performance.
Expand Down Expand Up @@ -340,6 +352,10 @@ nats_subject_monitor = "influx-spout-monitor"
# The writer will serve Kubernetes liveness and readiness probes on this port at
# /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# The writer will serve Go pprof requests at this port. Set to 0 (the default)
# to disable pprof support.
pprof_port = 0
```

A writer will batch up messages until one of the limits defined by the
Expand Down Expand Up @@ -376,6 +392,10 @@ port = 9331
# The monitor will serve Kubernetes liveness and readiness probes on this port
# at /healthz and /readyz. Set to 0 (the default) to disable probes support.
probe_port = 0

# The monitor will serve Go pprof requests at this port. Set to 0 (the default)
# to disable pprof support.
pprof_port = 0
```

## Running tests
Expand Down
15 changes: 15 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ package cmd

import (
"fmt"
"log"
"net/http"
"runtime"

// Profiling support
_ "net/http/pprof"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/listener"
Expand All @@ -38,6 +43,16 @@ func Run(configFile string) (out Stoppable, err error) {
return nil, fmt.Errorf("Error while loading config file: %v", err)
}

if c.PprofPort > 0 {
go func() {
log.Printf("starting pprof listener on port %d", c.PprofPort)
err := http.ListenAndServe(fmt.Sprintf(":%d", c.PprofPort), nil)
if err != nil {
log.Printf("pprof listener exited: %v", err)
}
}()
}

switch c.Mode {
case "listener":
out, err = listener.StartListener(c)
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
Rule []Rule `toml:"rule"`
MaxTimeDeltaSecs int `toml:"max_time_delta_secs"`
ProbePort int `toml:"probe_port"`
PprofPort int `toml:"pprof_port"`
Debug bool `toml:"debug"`
}

Expand Down Expand Up @@ -81,6 +82,7 @@ func newDefaultConfig() *Config {
ListenerBatchBytes: 1024 * 1024,
MaxTimeDeltaSecs: 600,
ProbePort: 0,
PprofPort: 0,
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ listener_batch_bytes = 4096
max_time_delta_secs = 789
probe_port = 6789
pprof_port = 5432
`
conf, err := parseConfig(validConfigSample)
require.NoError(t, err, "Couldn't parse a valid config: %v\n", err)
Expand All @@ -83,6 +84,7 @@ probe_port = 6789
assert.Equal(t, "nats://localhost:4222", conf.NATSAddress, "Address must match")

assert.Equal(t, 6789, conf.ProbePort)
assert.Equal(t, 5432, conf.PprofPort)
}

func TestAllDefaults(t *testing.T) {
Expand All @@ -109,6 +111,7 @@ func TestAllDefaults(t *testing.T) {
assert.Equal(t, 1048576, conf.ListenerBatchBytes)
assert.Equal(t, 600, conf.MaxTimeDeltaSecs)
assert.Equal(t, 0, conf.ProbePort)
assert.Equal(t, 0, conf.PprofPort)
assert.Equal(t, false, conf.Debug)
assert.Len(t, conf.Rule, 0)
}
Expand Down
3 changes: 0 additions & 3 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"net/http"
_ "net/http/pprof" // for profiling a nasty memleak

"github.com/nats-io/go-nats"

Expand Down Expand Up @@ -77,8 +76,6 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
}
}()

go http.ListenAndServe(":8080", nil) // for pprof profiling

w.rules, err = filter.RuleSetFromConfig(c)
if err != nil {
return nil, err
Expand Down

0 comments on commit 3fd57d9

Please sign in to comment.