Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NATS Monitoring Input Plugin #3674

Merged
merged 9 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1
github.com/mitchellh/mapstructure d0303fe809921458f417bcf828397a65db30a7e4
github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
Expand Down
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ following works:
- github.com/miekg/dns [BSD](https://github.com/miekg/dns/blob/master/LICENSE)
- github.com/naoina/go-stringutil [MIT](https://github.com/naoina/go-stringutil/blob/master/LICENSE)
- github.com/naoina/toml [MIT](https://github.com/naoina/toml/blob/master/LICENSE)
- github.com/nats-io/gnatsd [MIT](https://github.com/nats-io/gnatsd/blob/master/LICENSE)
- github.com/nats-io/go-nats [MIT](https://github.com/nats-io/go-nats/blob/master/LICENSE)
- github.com/nats-io/nats [MIT](https://github.com/nats-io/nats/blob/master/LICENSE)
- github.com/nats-io/nuid [MIT](https://github.com/nats-io/nuid/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nats"
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
Expand Down
12 changes: 12 additions & 0 deletions plugins/inputs/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# NATS Monitoring Input Plugin

The [NATS](http://www.nats.io/about/) monitoring plugin reads from
specified NATS instance and submits metrics to InfluxDB.

## Configuration

```toml
[[inputs.nats]]
## The address of the monitoring end-point of the NATS server
server = "http://localhost:8222"
```
112 changes: 112 additions & 0 deletions plugins/inputs/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package nats

import (
"io/ioutil"
"net/http"
"net/url"
"path"
"time"

"encoding/json"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"

gnatsd "github.com/nats-io/gnatsd/server"
)

type Nats struct {
Server string
ResponseTimeout internal.Duration

client *http.Client
}

var sampleConfig = `
## The address of the monitoring endpoint of the NATS server
server = "http://localhost:1337"

## Maximum time to receive response
# response_timeout = "5s"
`

func (n *Nats) SampleConfig() string {
return sampleConfig
}

func (n *Nats) Description() string {
return "Provides metrics about the state of a NATS server"
}

func (n *Nats) Gather(acc telegraf.Accumulator) error {
url, err := url.Parse(n.Server)
if err != nil {
return err
}
url.Path = path.Join(url.Path, "varz")

if n.client == nil {
n.client = n.createHTTPClient()
}
resp, err := n.client.Get(url.String())
if err != nil {
return err
}
defer resp.Body.Close()

bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

stats := new(gnatsd.Varz)
err = json.Unmarshal([]byte(bytes), &stats)
if err != nil {
return err
}

acc.AddFields("nats",
map[string]interface{}{
"in_msgs": stats.InMsgs,
"out_msgs": stats.OutMsgs,
"in_bytes": stats.InBytes,
"out_bytes": stats.OutBytes,
"uptime": stats.Now.Sub(stats.Start).Nanoseconds(),
"cores": stats.Cores,
"cpu": stats.CPU,
"mem": stats.Mem,
"connections": stats.Connections,
"total_connections": stats.TotalConnections,
"subscriptions": stats.Subscriptions,
"slow_consumers": stats.SlowConsumers,
"routes": stats.Routes,
"remotes": stats.Remotes,
},
map[string]string{"server": n.Server},
time.Now())

return nil
}

func (n *Nats) createHTTPClient() *http.Client {
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
}
timeout := n.ResponseTimeout.Duration
if timeout == time.Duration(0) {
timeout = 5 * time.Second
}
return &http.Client{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a new transport to avoid using the default one, all it needs set is Proxy: http.ProxyFromEnvironment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Transport: transport,
Timeout: timeout,
}
}

func init() {
inputs.Add("nats", func() telegraf.Input {
return &Nats{
Server: "http://localhost:8222",
}
})
}
112 changes: 112 additions & 0 deletions plugins/inputs/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package nats

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

var sampleVarz = `
{
"server_id": "n2afhLHLl64Gcaj7S7jaNa",
"version": "1.0.0",
"go": "go1.8",
"host": "0.0.0.0",
"auth_required": false,
"ssl_required": false,
"tls_required": false,
"tls_verify": false,
"addr": "0.0.0.0",
"max_connections": 65536,
"ping_interval": 120000000000,
"ping_max": 2,
"http_host": "0.0.0.0",
"http_port": 1337,
"https_port": 0,
"auth_timeout": 1,
"max_control_line": 1024,
"cluster": {
"addr": "0.0.0.0",
"cluster_port": 0,
"auth_timeout": 1
},
"tls_timeout": 0.5,
"port": 4222,
"max_payload": 1048576,
"start": "1861-04-12T10:15:26.841483489-05:00",
"now": "2011-10-05T15:24:23.722084098-07:00",
"uptime": "150y5md237h8m57s",
"mem": 15581184,
"cores": 48,
"cpu": 9,
"connections": 5,
"total_connections": 109,
"routes": 1,
"remotes": 2,
"in_msgs": 74148556,
"out_msgs": 68863261,
"in_bytes": 946267004717,
"out_bytes": 948110960598,
"slow_consumers": 2,
"subscriptions": 4,
"http_req_stats": {
"/": 1,
"/connz": 100847,
"/routez": 0,
"/subsz": 1,
"/varz": 205785
},
"config_load_time": "2017-07-24T10:15:26.841483489-05:00"
}
`

func TestMetricsCorrect(t *testing.T) {
var acc testutil.Accumulator

srv := newTestNatsServer()
defer srv.Close()

n := &Nats{Server: srv.URL}
err := n.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"in_msgs": int64(74148556),
"out_msgs": int64(68863261),
"in_bytes": int64(946267004717),
"out_bytes": int64(948110960598),
"uptime": int64(4748742536880600609),
"cores": 48,
"cpu": float64(9),
"mem": int64(15581184),
"connections": int(5),
"total_connections": uint64(109),
"subscriptions": uint32(4),
"slow_consumers": int64(2),
"routes": int(1),
"remotes": int(2),
}
tags := map[string]string{
"server": srv.URL,
}
acc.AssertContainsTaggedFields(t, "nats", fields, tags)
}

func newTestNatsServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string

switch r.URL.Path {
case "/varz":
rsp = sampleVarz
default:
panic("Cannot handle request")
}

fmt.Fprintln(w, rsp)
}))
}