Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #73 from mjs/listener-failed-nats-publish
Browse files Browse the repository at this point in the history
listener: Add `failed_nats_publish` metric
  • Loading branch information
oplehto authored May 1, 2018
2 parents 393a8c3 + 617a250 commit 2ba1aef
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
17 changes: 12 additions & 5 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (

const (
// Listener stats counters
statReceived = "received"
statSent = "sent"
statReadErrors = "read_errors"
statReceived = "received"
statSent = "sent"
statReadErrors = "read_errors"
statFailedNATSPublish = "failed_nats_publish"

// The maximum possible UDP read size.
udpMaxDatagramSize = 65536
Expand Down Expand Up @@ -129,8 +130,13 @@ func newListener(c *config.Config) (*Listener, error) {
c: c,
ready: make(chan struct{}),
stop: make(chan struct{}),
stats: stats.New(statReceived, statSent, statReadErrors),
buf: make([]byte, c.ListenerBatchBytes),
stats: stats.New(
statReceived,
statSent,
statReadErrors,
statFailedNATSPublish,
),
buf: make([]byte, c.ListenerBatchBytes),

// If more than batchSizeThreshold bytes has been written to
// the current batch buffer, the batch will be sent. We allow
Expand Down Expand Up @@ -262,6 +268,7 @@ func (l *Listener) processRead(sz int) {
if statReceived%l.c.BatchMessages == 0 || l.batchSize > l.batchSizeThreshold {
l.stats.Inc(statSent)
if err := l.nc.Publish(l.c.NATSSubject[0], l.buf[:l.batchSize]); err != nil {
l.stats.Inc(statFailedNATSPublish)
l.handleNatsError(err)
}
l.batchSize = 0
Expand Down
1 change: 1 addition & 0 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) {
fmt.Sprintf(`received{component="listener",name="testlistener"} %d`, received),
fmt.Sprintf(`sent{component="listener",name="testlistener"} %d`, sent),
`read_errors{component="listener",name="testlistener"} 0`,
`failed_nats_publish{component="listener",name="testlistener"} 0`,
}
spouttest.AssertMonitor(t, monitorCh, expected)
}
1 change: 1 addition & 0 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestEndToEnd(t *testing.T) {
// Check metrics published by monitor component.
expectedMetrics := regexp.MustCompile(`
failed_nats_publish{component="filter",name="filter"} 0
failed_nats_publish{component="listener",name="listener"} 0
failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0
invalid_time{component="filter",name="filter"} 0
max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+
Expand Down

0 comments on commit 2ba1aef

Please sign in to comment.