From 617a25052b9284101b73f8e34efee003b8ba4d24 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 1 May 2018 10:45:23 +1200 Subject: [PATCH] listener: Add `failed_nats_publish` metric This increments if the listener fails to publish lines to NATS (for consumption by the filter). --- listener/listener.go | 17 ++++++++++++----- listener/listener_medium_test.go | 1 + spouttest/e2e_test.go | 1 + 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/listener/listener.go b/listener/listener.go index 74d7de2..05c5044 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -34,9 +34,10 @@ import ( const ( // Listener stats counters - statReceived = "received" - statSent = "sent" - statReadErrors = "read_errors" + statReceived = "received" + statSent = "sent" + statReadErrors = "read_errors" + statFailedNATSPublish = "failed_nats_publish" // The maximum possible UDP read size. udpMaxDatagramSize = 65536 @@ -129,8 +130,13 @@ func newListener(c *config.Config) (*Listener, error) { c: c, ready: make(chan struct{}), stop: make(chan struct{}), - stats: stats.New(statReceived, statSent, statReadErrors), - buf: make([]byte, c.ListenerBatchBytes), + stats: stats.New( + statReceived, + statSent, + statReadErrors, + statFailedNATSPublish, + ), + buf: make([]byte, c.ListenerBatchBytes), // If more than batchSizeThreshold bytes has been written to // the current batch buffer, the batch will be sent. We allow @@ -262,6 +268,7 @@ func (l *Listener) processRead(sz int) { if statReceived%l.c.BatchMessages == 0 || l.batchSize > l.batchSizeThreshold { l.stats.Inc(statSent) if err := l.nc.Publish(l.c.NATSSubject[0], l.buf[:l.batchSize]); err != nil { + l.stats.Inc(statFailedNATSPublish) l.handleNatsError(err) } l.batchSize = 0 diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index b7f0817..3950410 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -299,6 +299,7 @@ func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) { fmt.Sprintf(`received{component="listener",name="testlistener"} %d`, received), fmt.Sprintf(`sent{component="listener",name="testlistener"} %d`, sent), `read_errors{component="listener",name="testlistener"} 0`, + `failed_nats_publish{component="listener",name="testlistener"} 0`, } spouttest.AssertMonitor(t, monitorCh, expected) } diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 70de700..0e76610 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -131,6 +131,7 @@ func TestEndToEnd(t *testing.T) { // Check metrics published by monitor component. expectedMetrics := regexp.MustCompile(` failed_nats_publish{component="filter",name="filter"} 0 +failed_nats_publish{component="listener",name="listener"} 0 failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0 invalid_time{component="filter",name="filter"} 0 max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+