From 480211f5e0959e7bfa9ba2d553228fec0a06b2b8 Mon Sep 17 00:00:00 2001 From: Levente Kurusa Date: Thu, 31 Aug 2017 14:47:37 -0500 Subject: [PATCH] Add a NATS monitoring input plugin Signed-off-by: Levente Kurusa --- Godeps | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/nats/README.md | 12 ++++ plugins/inputs/nats/nats.go | 79 +++++++++++++++++++++ plugins/inputs/nats/nats_test.go | 114 +++++++++++++++++++++++++++++++ 5 files changed, 207 insertions(+) create mode 100644 plugins/inputs/nats/README.md create mode 100644 plugins/inputs/nats/nats.go create mode 100644 plugins/inputs/nats/nats_test.go diff --git a/Godeps b/Godeps index b538624adb93a..fe208650fa88f 100644 --- a/Godeps +++ b/Godeps @@ -33,6 +33,7 @@ github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b +github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 9bc7afaff1337..5a6945da5c326 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -50,6 +50,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" + _ "github.com/influxdata/telegraf/plugins/inputs/nats" _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" diff --git a/plugins/inputs/nats/README.md b/plugins/inputs/nats/README.md new file mode 100644 index 0000000000000..3cd9ee7ac3fbb --- /dev/null +++ b/plugins/inputs/nats/README.md @@ -0,0 +1,12 @@ +# NATS Monitoring Input Plugin + +The [NATS](http://www.nats.io/about/) monitoring plugin reads from +specified NATS instance and submits metrics to InfluxDB. + +## Configuration + +```toml +[[inputs.nats]] + ## The address of the monitoring end-point of the NATS server + server = "http://localhost:8222" +``` diff --git a/plugins/inputs/nats/nats.go b/plugins/inputs/nats/nats.go new file mode 100644 index 0000000000000..60263b2cf33c0 --- /dev/null +++ b/plugins/inputs/nats/nats.go @@ -0,0 +1,79 @@ +package nats + +import ( + "fmt" + "io/ioutil" + "net/http" + "time" + + "encoding/json" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + gnatsd "github.com/nats-io/gnatsd/server" +) + +type Nats struct { + Server string +} + +var sampleConfig = ` + ## The address of the monitoring end-point of the NATS server + server = "http://localhost:1337" +` + +func (n *Nats) SampleConfig() string { + return sampleConfig +} + +func (n *Nats) Description() string { + return "Provides metrics about the state of a NATS server" +} + +func (n *Nats) Gather(acc telegraf.Accumulator) error { + theServer := fmt.Sprintf("%s/varz", n.Server) + + /* download the page we are intereted in */ + resp, err := http.Get(theServer) + if err != nil { + return err + } + defer resp.Body.Close() + + bytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + var stats = new(gnatsd.Varz) + + err = json.Unmarshal([]byte(bytes), &stats) + if err != nil { + return err + } + + acc.AddFields("nats", + map[string]interface{}{ + "in_msgs": stats.InMsgs, + "out_msgs": stats.OutMsgs, + "uptime": time.Since(stats.Start).Seconds(), + "connections": stats.Connections, + "total_connections": stats.TotalConnections, + "in_bytes": stats.InBytes, + "cpu_usage": stats.CPU, + "out_bytes": stats.OutBytes, + "mem": stats.Mem, + "subscriptions": stats.Subscriptions, + }, nil, time.Now()) + + return nil +} + +func init() { + inputs.Add("nats", func() telegraf.Input { + return &Nats{ + Server: "http://localhost:8222", + } + }) +} diff --git a/plugins/inputs/nats/nats_test.go b/plugins/inputs/nats/nats_test.go new file mode 100644 index 0000000000000..8ffa7822d4d81 --- /dev/null +++ b/plugins/inputs/nats/nats_test.go @@ -0,0 +1,114 @@ +package nats + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var sampleVarz = ` +{ + "server_id": "n2afhLHLl64Gcaj7S7jaNa", + "version": "1.0.0", + "go": "go1.8", + "host": "0.0.0.0", + "auth_required": false, + "ssl_required": false, + "tls_required": false, + "tls_verify": false, + "addr": "0.0.0.0", + "max_connections": 65536, + "ping_interval": 120000000000, + "ping_max": 2, + "http_host": "0.0.0.0", + "http_port": 1337, + "https_port": 0, + "auth_timeout": 1, + "max_control_line": 1024, + "cluster": { + "addr": "0.0.0.0", + "cluster_port": 0, + "auth_timeout": 1 + }, + "tls_timeout": 0.5, + "port": 4222, + "max_payload": 1048576, + "start": "1861-04-12T10:15:26.841483489-05:00", + "now": "2011-10-05T15:24:23.722084098-07:00", + "uptime": "150y5md237h8m57s", + "mem": 15581184, + "cores": 48, + "cpu": 9, + "connections": 2, + "total_connections": 109, + "routes": 0, + "remotes": 0, + "in_msgs": 74148556, + "out_msgs": 68863261, + "in_bytes": 946267004717, + "out_bytes": 948110960598, + "slow_consumers": 0, + "subscriptions": 1, + "http_req_stats": { + "/": 1, + "/connz": 100847, + "/routez": 0, + "/subsz": 1, + "/varz": 205785 + }, + "config_load_time": "2017-07-24T10:15:26.841483489-05:00" +} +` + +func TestMetricsCorrect(t *testing.T) { + var acc testutil.Accumulator + + srv := newTestNatsServer() + defer srv.Close() + + n := &Nats{Server: srv.URL} + err := n.Gather(&acc) + require.NoError(t, err) + + /* + * we get the measurement, and override it, this is neccessary + * because we can't "equal" the uptime value reliably, as it is + * calculated via Time.Now() and the Start value in Varz + */ + s, f := acc.Get("nats") + assert.Equal(t, true, f, "nats measurement must be found") + + fields := make(map[string]interface{}) + fields["uptime"] = s.Fields["uptime"] + fields["in_msgs"] = int64(74148556) + fields["out_msgs"] = int64(68863261) + fields["connections"] = int(2) + fields["total_connections"] = uint64(109) + fields["in_bytes"] = int64(946267004717) + fields["out_bytes"] = int64(948110960598) + fields["cpu_usage"] = float64(9) + fields["mem"] = int64(15581184) + fields["subscriptions"] = uint32(1) + + acc.AssertContainsFields(t, "nats", fields) +} + +func newTestNatsServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + + switch r.URL.Path { + case "/varz": + rsp = sampleVarz + default: + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) +}