diff --git a/CHANGELOG.md b/CHANGELOG.md index 18a68ebf7b95e..0be1ec417db41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert! - [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454! - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. +- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek! ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b6b1e74daa979..b4c8553c3fde8 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/inputs/mongodb" _ "github.com/influxdb/telegraf/plugins/inputs/mysql" _ "github.com/influxdb/telegraf/plugins/inputs/nginx" + _ "github.com/influxdb/telegraf/plugins/inputs/nsq" _ "github.com/influxdb/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdb/telegraf/plugins/inputs/ping" _ "github.com/influxdb/telegraf/plugins/inputs/postgresql" diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go new file mode 100644 index 0000000000000..48a709a377b55 --- /dev/null +++ b/plugins/inputs/nsq/nsq.go @@ -0,0 +1,271 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Jeff Nickoloff (jeff@allingeek.com) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package nsq + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins/inputs" +) + +// Might add Lookupd endpoints for cluster discovery +type NSQ struct { + Endpoints []string +} + +var sampleConfig = ` + # An array of NSQD HTTP API endpoints + endpoints = ["http://localhost:4151"] +` + +const ( + requestPattern = `%s/stats?format=json` +) + +func init() { + inputs.Add("nsq", func() inputs.Input { + return &NSQ{} + }) +} + +func (n *NSQ) SampleConfig() string { + return sampleConfig +} + +func (n *NSQ) Description() string { + return "Read NSQ topic and channel statistics." +} + +func (n *NSQ) Gather(acc inputs.Accumulator) error { + var wg sync.WaitGroup + var outerr error + + for _, e := range n.Endpoints { + wg.Add(1) + go func(e string) { + defer wg.Done() + outerr = n.gatherEndpoint(e, acc) + }(e) + } + + wg.Wait() + + return outerr +} + +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{Transport: tr} + +func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error { + u, err := buildURL(e) + if err != nil { + return err + } + r, err := client.Get(u.String()) + if err != nil { + return fmt.Errorf("Error while polling %s: %s", u.String(), err) + } + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status) + } + + s := &NSQStats{} + err = json.NewDecoder(r.Body).Decode(s) + if err != nil { + return fmt.Errorf(`Error parsing response: %s`, err) + } + + tags := map[string]string{ + `server_host`: u.Host, + `server_version`: s.Data.Version, + } + + fields := make(map[string]interface{}) + if s.Data.Health == `OK` { + fields["server_count"] = int64(1) + } else { + fields["server_count"] = int64(0) + } + fields["topic_count"] = int64(len(s.Data.Topics)) + + acc.AddFields("nsq_server", fields, tags) + for _, t := range s.Data.Topics { + topicStats(t, acc, u.Host, s.Data.Version) + } + + return nil +} + +func buildURL(e string) (*url.URL, error) { + u := fmt.Sprintf(requestPattern, e) + addr, err := url.Parse(u) + if err != nil { + return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err) + } + return addr, nil +} + +func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) { + // per topic overall (tag: name, paused, channel count) + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": t.Name, + } + + fields := map[string]interface{}{ + "depth": t.Depth, + "backend_depth": t.BackendDepth, + "message_count": t.MessageCount, + "channel_count": int64(len(t.Channels)), + } + acc.AddFields("nsq_topic", fields, tags) + + for _, c := range t.Channels { + channelStats(c, acc, host, version, t.Name) + } +} + +func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) { + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": topic, + "channel": c.Name, + } + + fields := map[string]interface{}{ + "depth": c.Depth, + "backend_depth": c.BackendDepth, + "inflight_count": c.InFlightCount, + "deferred_count": c.DeferredCount, + "message_count": c.MessageCount, + "requeue_count": c.RequeueCount, + "timeout_count": c.TimeoutCount, + "client_count": int64(len(c.Clients)), + } + + acc.AddFields("nsq_channel", fields, tags) + for _, cl := range c.Clients { + clientStats(cl, acc, host, version, topic, c.Name) + } +} + +func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) { + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": topic, + "channel": channel, + "client_name": c.Name, + "client_id": c.ID, + "client_hostname": c.Hostname, + "client_version": c.Version, + "client_address": c.RemoteAddress, + "client_user_agent": c.UserAgent, + "client_tls": strconv.FormatBool(c.TLS), + "client_snappy": strconv.FormatBool(c.Snappy), + "client_deflate": strconv.FormatBool(c.Deflate), + } + + fields := map[string]interface{}{ + "ready_count": c.ReadyCount, + "inflight_count": c.InFlightCount, + "message_count": c.MessageCount, + "finish_count": c.FinishCount, + "requeue_count": c.RequeueCount, + } + acc.AddFields("nsq_client", fields, tags) +} + +type NSQStats struct { + Code int64 `json:"status_code"` + Txt string `json:"status_txt"` + Data NSQStatsData `json:"data"` +} + +type NSQStatsData struct { + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` +} + +// e2e_processing_latency is not modeled +type TopicStats struct { + Name string `json:"topic_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount int64 `json:"message_count"` + Paused bool `json:"paused"` + Channels []ChannelStats `json:"channels"` +} + +// e2e_processing_latency is not modeled +type ChannelStats struct { + Name string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int64 `json:"in_flight_count"` + DeferredCount int64 `json:"deferred_count"` + MessageCount int64 `json:"message_count"` + RequeueCount int64 `json:"requeue_count"` + TimeoutCount int64 `json:"timeout_count"` + Paused bool `json:"paused"` + Clients []ClientStats `json:"clients"` +} + +type ClientStats struct { + Name string `json:"name"` + ID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int64 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount int64 `json:"message_count"` + FinishCount int64 `json:"finish_count"` + RequeueCount int64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int64 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + TLS bool `json:"tls"` + TLSCipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go new file mode 100644 index 0000000000000..fc34a710b017d --- /dev/null +++ b/plugins/inputs/nsq/nsq_test.go @@ -0,0 +1,273 @@ +package nsq + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdb/telegraf/testutil" + + "github.com/stretchr/testify/require" +) + +func TestNSQStats(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, response) + })) + defer ts.Close() + + n := &NSQ{ + Endpoints: []string{ts.URL}, + } + + var acc testutil.Accumulator + err := n.Gather(&acc) + require.NoError(t, err) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + host := u.Host + + // actually validate the tests + tests := []struct { + m string + f map[string]interface{} + g map[string]string + }{ + { + "nsq_server", + map[string]interface{}{ + "server_count": int64(1), + "topic_count": int64(2), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + }, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(12), + "backend_depth": int64(13), + "message_count": int64(14), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(0), + "backend_depth": int64(1), + "inflight_count": int64(2), + "deferred_count": int64(3), + "message_count": int64(4), + "requeue_count": int64(5), + "timeout_count": int64(6), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1", + "channel": "c1", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(200), + "inflight_count": int64(7), + "message_count": int64(8), + "finish_count": int64(9), + "requeue_count": int64(10), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t1", "channel": "c1", "client_name": "373a715cd990", + "client_id": "373a715cd990", "client_hostname": "373a715cd990", + "client_version": "V2", "client_address": "172.17.0.11:35560", + "client_tls": "false", "client_snappy": "false", + "client_deflate": "false", + "client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"}, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(28), + "backend_depth": int64(29), + "message_count": int64(30), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(15), + "backend_depth": int64(16), + "inflight_count": int64(17), + "deferred_count": int64(18), + "message_count": int64(19), + "requeue_count": int64(20), + "timeout_count": int64(21), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2", + "channel": "c2", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(22), + "inflight_count": int64(23), + "message_count": int64(24), + "finish_count": int64(25), + "requeue_count": int64(26), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t2", "channel": "c2", "client_name": "377569bd462b", + "client_id": "377569bd462b", "client_hostname": "377569bd462b", + "client_version": "V2", "client_address": "172.17.0.8:48145", + "client_user_agent": "go-nsq/1.0.5", "client_tls": "true", + "client_snappy": "true", "client_deflate": "true"}, + }, + } + + for _, test := range tests { + acc.AssertContainsTaggedFields(t, test.m, test.f, test.g) + } +} + +var response = ` +{ + "status_code": 200, + "status_txt": "OK", + "data": { + "version": "0.3.6", + "health": "OK", + "start_time": 1452021674, + "topics": [ + { + "topic_name": "t1", + "channels": [ + { + "channel_name": "c1", + "depth": 0, + "backend_depth": 1, + "in_flight_count": 2, + "deferred_count": 3, + "message_count": 4, + "requeue_count": 5, + "timeout_count": 6, + "clients": [ + { + "name": "373a715cd990", + "client_id": "373a715cd990", + "hostname": "373a715cd990", + "version": "V2", + "remote_address": "172.17.0.11:35560", + "state": 3, + "ready_count": 200, + "in_flight_count": 7, + "message_count": 8, + "finish_count": 9, + "requeue_count": 10, + "connect_ts": 1452021675, + "sample_rate": 11, + "deflate": false, + "snappy": false, + "user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5", + "tls": false, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 12, + "backend_depth": 13, + "message_count": 14, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + }, + { + "topic_name": "t2", + "channels": [ + { + "channel_name": "c2", + "depth": 15, + "backend_depth": 16, + "in_flight_count": 17, + "deferred_count": 18, + "message_count": 19, + "requeue_count": 20, + "timeout_count": 21, + "clients": [ + { + "name": "377569bd462b", + "client_id": "377569bd462b", + "hostname": "377569bd462b", + "version": "V2", + "remote_address": "172.17.0.8:48145", + "state": 3, + "ready_count": 22, + "in_flight_count": 23, + "message_count": 24, + "finish_count": 25, + "requeue_count": 26, + "connect_ts": 1452021678, + "sample_rate": 27, + "deflate": true, + "snappy": true, + "user_agent": "go-nsq\/1.0.5", + "tls": true, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 28, + "backend_depth": 29, + "message_count": 30, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ] + } +} +`