Skip to content

Commit

Permalink
Add a NATS monitoring input plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Levente Kurusa <levex@linux.com>
  • Loading branch information
levex committed Aug 31, 2017
1 parent a2d4453 commit 480211f
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
Expand Down
12 changes: 12 additions & 0 deletions plugins/inputs/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# NATS Monitoring Input Plugin

The [NATS](http://www.nats.io/about/) monitoring plugin reads from
specified NATS instance and submits metrics to InfluxDB.

## Configuration

```toml
[[inputs.nats]]
## The address of the monitoring end-point of the NATS server
server = "http://localhost:8222"
```
79 changes: 79 additions & 0 deletions plugins/inputs/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package nats

import (
"fmt"
"io/ioutil"
"net/http"
"time"

"encoding/json"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"

gnatsd "github.com/nats-io/gnatsd/server"
)

type Nats struct {
Server string
}

var sampleConfig = `
## The address of the monitoring end-point of the NATS server
server = "http://localhost:1337"
`

func (n *Nats) SampleConfig() string {
return sampleConfig
}

func (n *Nats) Description() string {
return "Provides metrics about the state of a NATS server"
}

func (n *Nats) Gather(acc telegraf.Accumulator) error {
theServer := fmt.Sprintf("%s/varz", n.Server)

/* download the page we are intereted in */
resp, err := http.Get(theServer)
if err != nil {
return err
}
defer resp.Body.Close()

bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

var stats = new(gnatsd.Varz)

err = json.Unmarshal([]byte(bytes), &stats)
if err != nil {
return err
}

acc.AddFields("nats",
map[string]interface{}{
"in_msgs": stats.InMsgs,
"out_msgs": stats.OutMsgs,
"uptime": time.Since(stats.Start).Seconds(),
"connections": stats.Connections,
"total_connections": stats.TotalConnections,
"in_bytes": stats.InBytes,
"cpu_usage": stats.CPU,
"out_bytes": stats.OutBytes,
"mem": stats.Mem,
"subscriptions": stats.Subscriptions,
}, nil, time.Now())

return nil
}

func init() {
inputs.Add("nats", func() telegraf.Input {
return &Nats{
Server: "http://localhost:8222",
}
})
}
114 changes: 114 additions & 0 deletions plugins/inputs/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package nats

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var sampleVarz = `
{
"server_id": "n2afhLHLl64Gcaj7S7jaNa",
"version": "1.0.0",
"go": "go1.8",
"host": "0.0.0.0",
"auth_required": false,
"ssl_required": false,
"tls_required": false,
"tls_verify": false,
"addr": "0.0.0.0",
"max_connections": 65536,
"ping_interval": 120000000000,
"ping_max": 2,
"http_host": "0.0.0.0",
"http_port": 1337,
"https_port": 0,
"auth_timeout": 1,
"max_control_line": 1024,
"cluster": {
"addr": "0.0.0.0",
"cluster_port": 0,
"auth_timeout": 1
},
"tls_timeout": 0.5,
"port": 4222,
"max_payload": 1048576,
"start": "1861-04-12T10:15:26.841483489-05:00",
"now": "2011-10-05T15:24:23.722084098-07:00",
"uptime": "150y5md237h8m57s",
"mem": 15581184,
"cores": 48,
"cpu": 9,
"connections": 2,
"total_connections": 109,
"routes": 0,
"remotes": 0,
"in_msgs": 74148556,
"out_msgs": 68863261,
"in_bytes": 946267004717,
"out_bytes": 948110960598,
"slow_consumers": 0,
"subscriptions": 1,
"http_req_stats": {
"/": 1,
"/connz": 100847,
"/routez": 0,
"/subsz": 1,
"/varz": 205785
},
"config_load_time": "2017-07-24T10:15:26.841483489-05:00"
}
`

func TestMetricsCorrect(t *testing.T) {
var acc testutil.Accumulator

srv := newTestNatsServer()
defer srv.Close()

n := &Nats{Server: srv.URL}
err := n.Gather(&acc)
require.NoError(t, err)

/*
* we get the measurement, and override it, this is neccessary
* because we can't "equal" the uptime value reliably, as it is
* calculated via Time.Now() and the Start value in Varz
*/
s, f := acc.Get("nats")
assert.Equal(t, true, f, "nats measurement must be found")

fields := make(map[string]interface{})
fields["uptime"] = s.Fields["uptime"]
fields["in_msgs"] = int64(74148556)
fields["out_msgs"] = int64(68863261)
fields["connections"] = int(2)
fields["total_connections"] = uint64(109)
fields["in_bytes"] = int64(946267004717)
fields["out_bytes"] = int64(948110960598)
fields["cpu_usage"] = float64(9)
fields["mem"] = int64(15581184)
fields["subscriptions"] = uint32(1)

acc.AssertContainsFields(t, "nats", fields)
}

func newTestNatsServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string

switch r.URL.Path {
case "/varz":
rsp = sampleVarz
default:
panic("Cannot handle request")
}

fmt.Fprintln(w, rsp)
}))
}

0 comments on commit 480211f

Please sign in to comment.