Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Expose NATS dropped messages metric
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed Apr 24, 2018
1 parent 840e8da commit 3582e39
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
13 changes: 13 additions & 0 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
statProcessed = "processed"
statRejected = "rejected"
statInvalidTime = "invalid_time"
statNATSDropped = "nats_dropped"
)

// StartFilter creates a Filter instance, sets up its rules based on
Expand Down Expand Up @@ -113,6 +114,7 @@ func initStats(rules *RuleSet) *stats.Stats {
statProcessed,
statRejected,
statInvalidTime,
statNATSDropped,
}
for i := 0; i < rules.Count(); i++ {
statNames = append(statNames, ruleToStatsName(i))
Expand Down Expand Up @@ -164,6 +166,8 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) {
}

for {
f.updateNATSDropped(st)

now := time.Now()
snap, ruleCounts := splitSnapshot(st.Snapshot())

Expand Down Expand Up @@ -193,6 +197,15 @@ func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) {
}
}

func (f *Filter) updateNATSDropped(st *stats.Stats) {
dropped, err := f.sub.Dropped()
if err != nil {
log.Printf("NATS: failed to read subscription drops: %v", err)
return
}
st.Max(statNATSDropped, dropped)
}

const rulePrefix = "rule-"

// ruleToStatsName converts a rule index to a name to a key for use
Expand Down
2 changes: 2 additions & 0 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ goodbye,host=gopher01
`processed{component="filter",name="particle"} 3`,
`rejected{component="filter",name="particle"} 1`,
`invalid_time{component="filter",name="particle"} 0`,
`nats_dropped{component="filter",name="particle"} 0`,
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
}
Expand Down Expand Up @@ -166,6 +167,7 @@ func TestInvalidTimeStamps(t *testing.T) {
`processed{component="filter",name="particle"} 4`,
`rejected{component="filter",name="particle"} 0`,
`invalid_time{component="filter",name="particle"} 2`,
`nats_dropped{component="filter",name="particle"} 0`,
`triggered{component="filter",name="particle",rule="hello-subject"} 2`,
})
}
1 change: 1 addition & 0 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestEndToEnd(t *testing.T) {
failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} 0
invalid_time{component="filter",name="filter"} 0
max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+
nats_dropped{component="filter",name="filter"} 0
passed{component="filter",name="filter"} 10
processed{component="filter",name="filter"} 20
read_errors{component="listener",name="listener"} 0
Expand Down

0 comments on commit 3582e39

Please sign in to comment.