diff --git a/filter/filter.go b/filter/filter.go index 1beeaa9..e5d4072 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -18,22 +18,23 @@ package filter import ( "fmt" "log" - "strconv" + "sort" + "strings" "sync" "time" "github.com/jumptrading/influx-spout/config" - "github.com/jumptrading/influx-spout/lineformatter" + "github.com/jumptrading/influx-spout/prometheus" "github.com/jumptrading/influx-spout/stats" "github.com/nats-io/go-nats" ) // Name for supported stats const ( - linesPassed = "passed" - linesProcessed = "processed" - linesRejected = "rejected" - linesInvalidTime = "invalid-time" + statPassed = "passed" + statProcessed = "processed" + statRejected = "rejected" + statInvalidTime = "invalid_time" ) // StartFilter creates a Filter instance, sets up its rules based on @@ -109,10 +110,10 @@ func (f *Filter) natsConnect() (natsConn, error) { func initStats(rules *RuleSet) *stats.Stats { // Initialise statNames := []string{ - linesPassed, - linesProcessed, - linesRejected, - linesInvalidTime, + statPassed, + statProcessed, + statRejected, + statInvalidTime, } for i := 0; i < rules.Count(); i++ { statNames = append(statNames, ruleToStatsName(i)) @@ -155,40 +156,36 @@ func (f *Filter) Stop() { // startStatistician defines a goroutine that is responsible for // regularly sending the filter's statistics to the monitoring // backend. -func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { +func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) { defer f.wg.Done() - totalLine := lineformatter.New( - "spout_stat_filter", - []string{"filter"}, - linesPassed, linesProcessed, linesRejected, linesInvalidTime, - ) - ruleLine := lineformatter.New( - "spout_stat_filter_rule", - []string{"filter", "rule"}, - "triggered", - ) + generalLabels := map[string]string{ + "filter": f.c.Name, + } for { - st := stats.Clone() + now := time.Now() + snap, ruleCounts := splitSnapshot(st.Snapshot()) - // publish the grand stats - f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format( - []string{f.c.Name}, - st.Get(linesPassed), - st.Get(linesProcessed), - st.Get(linesRejected), - st.Get(linesInvalidTime), - )) + // publish the general stats + lines := stats.SnapshotToPrometheus(snap, now, generalLabels) + f.nc.Publish(f.c.NATSSubjectMonitor, lines) // publish the per rule stats + // XXX merge with SnapshotToPrometheus + millis := now.UnixNano() / int64(time.Millisecond) for i, subject := range rules.Subjects() { - f.nc.Publish(f.c.NATSSubjectMonitor, - ruleLine.Format( - []string{f.c.Name, subject}, - st.Get(ruleToStatsName(i)), - ), - ) + metric := &prometheus.Metric{ + Name: []byte("triggered"), + Labels: prometheus.LabelPairs{ + {[]byte("filter"), []byte(f.c.Name)}, + {[]byte("rule"), []byte(subject)}, + }, + Value: int64(ruleCounts[i]), + Milliseconds: millis, + } + + f.nc.Publish(f.c.NATSSubjectMonitor, metric.ToBytes()) } select { @@ -199,8 +196,38 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) { } } +const rulePrefix = "rule-" + // ruleToStatsName converts a rule index to a name to a key for use // with a stats.Stats instance. func ruleToStatsName(i int) string { - return "rule" + strconv.Itoa(i) + return fmt.Sprintf("%s%06d", rulePrefix, i) +} + +// splitSnapshot takes a Snapshot and splits out the rule counters +// from the others. The rule counters are returned in an ordered slice +// while the other counters are returned as a new (smaller) Snapshot. +func splitSnapshot(snap stats.Snapshot) (stats.Snapshot, []int) { + var genSnap stats.Snapshot + var ruleSnap stats.Snapshot + + // Split up rule counters from the others. + for _, counter := range snap { + if strings.HasPrefix(counter.Name, rulePrefix) { + ruleSnap = append(ruleSnap, counter) + } else { + genSnap = append(genSnap, counter) + } + } + + // Sort the rule counters by name and extract just the counts. + sort.Slice(ruleSnap, func(i, j int) bool { + return ruleSnap[i].Name < ruleSnap[j].Name + }) + ruleCounts := make([]int, len(ruleSnap)) + for i, counter := range ruleSnap { + ruleCounts[i] = counter.Value + } + + return genSnap, ruleCounts } diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index a01c1f3..c4487db 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -76,10 +76,10 @@ func TestFilterWorker(t *testing.T) { }) require.NoError(t, err) - // Subscribe to stats output - statsCh := make(chan string, 10) + // Subscribe to monitor output + monitorCh := make(chan string, 10) _, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { - statsCh <- string(msg.Data) + monitorCh <- string(msg.Data) }) require.NoError(t, err) @@ -103,15 +103,14 @@ hello,host=gopher01 goodbye,host=gopher01 `) - // Receive total stats - spouttest.AssertRecvMulti(t, statsCh, "stats", ` -spout_stat_filter,filter=particle passed=2,processed=3,rejected=1,invalid-time=0 -`) - - // Receive rule specific stats - spouttest.AssertRecvMulti(t, statsCh, "rule stats", ` -spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 -`) + // Receive monitor metrics + spouttest.AssertMonitor(t, monitorCh, []string{ + `passed{filter="particle"} 2`, + `processed{filter="particle"} 3`, + `rejected{filter="particle"} 1`, + `invalid_time{filter="particle"} 0`, + `triggered{filter="particle",rule="hello-subject"} 2`, + }) } func TestInvalidTimeStamps(t *testing.T) { @@ -136,10 +135,10 @@ func TestInvalidTimeStamps(t *testing.T) { }) require.NoError(t, err) - // Subscribe to stats output - statsCh := make(chan string, 10) + // Subscribe to monitor output + monitorCh := make(chan string, 10) _, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) { - statsCh <- string(msg.Data) + monitorCh <- string(msg.Data) }) require.NoError(t, err) @@ -161,13 +160,12 @@ func TestInvalidTimeStamps(t *testing.T) { // Expect to see the 3rd & 4th lines. spouttest.AssertRecv(t, helloCh, "helloCh", strings.Join(lines[2:], "\n")) - // Receive total stats. - spouttest.AssertRecvMulti(t, statsCh, "stats", ` -spout_stat_filter,filter=particle passed=2,processed=4,rejected=0,invalid-time=2 -`) - - // Receive rule specific stats - spouttest.AssertRecvMulti(t, statsCh, "rule stats", ` -spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2 -`) + // Receive monitor metrics. + spouttest.AssertMonitor(t, monitorCh, []string{ + `passed{filter="particle"} 2`, + `processed{filter="particle"} 4`, + `rejected{filter="particle"} 0`, + `invalid_time{filter="particle"} 2`, + `triggered{filter="particle",rule="hello-subject"} 2`, + }) } diff --git a/filter/worker.go b/filter/worker.go index 9dd836a..15926ae 100644 --- a/filter/worker.go +++ b/filter/worker.go @@ -91,13 +91,13 @@ func (w *worker) processBatch(batch []byte) { if len(line) == 0 { continue } - w.stats.Inc(linesProcessed) + w.stats.Inc(statProcessed) ts := extractTimestamp(line, now) if minTs < ts && ts < maxTs { w.processLine(line) } else { - w.stats.Inc(linesInvalidTime) + w.stats.Inc(statInvalidTime) if w.debug { log.Printf("invalid line timestamp: %q", string(line)) } @@ -112,7 +112,7 @@ func (w *worker) processLine(line []byte) { idx := w.rules.Lookup(line) if idx == -1 { // no rule for this => junkyard - w.stats.Inc(linesRejected) + w.stats.Inc(statRejected) w.junkBatch.Write(line) return } @@ -120,7 +120,7 @@ func (w *worker) processLine(line []byte) { // write to the corresponding batch buffer w.batches[idx].Write(line) - w.stats.Inc(linesPassed) + w.stats.Inc(statPassed) w.stats.Inc(ruleToStatsName(idx)) } diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index 9fc29a7..db22412 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "os" - "strconv" "strings" "testing" "time" @@ -296,42 +295,10 @@ func assertNoMore(t *testing.T, ch chan string) { } func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) { - remaining := map[string]bool{ - fmt.Sprintf(`received{listener="testlistener"} %d`, received): true, - fmt.Sprintf(`sent{listener="testlistener"} %d`, sent): true, - `read_errors{listener="testlistener"} 0`: true, + expected := []string{ + fmt.Sprintf(`received{listener="testlistener"} %d`, received), + fmt.Sprintf(`sent{listener="testlistener"} %d`, sent), + `read_errors{listener="testlistener"} 0`, } - - var allLines string - timeout := time.After(spouttest.LongWait) - for { - select { - case lines := <-monitorCh: - for _, line := range strings.Split(lines, "\n") { - if len(line) == 0 { - continue - } - line = stripTimestamp(t, line) - allLines += fmt.Sprintf("%q\n", line) - delete(remaining, line) - } - if len(remaining) < 1 { - return - } - case <-timeout: - t.Fatalf("timed out waiting for expected stats. received: %s", allLines) - } - } -} - -func stripTimestamp(t *testing.T, s string) string { - i := strings.LastIndexByte(s, ' ') - require.True(t, i >= 0) - - // Check that end looks like a timestamp - _, err := strconv.Atoi(s[i+1:]) - require.NoError(t, err) - - // Strip off the timestamp - return s[:i] + spouttest.AssertMonitor(t, monitorCh, expected) } diff --git a/spouttest/asserts.go b/spouttest/asserts.go index 9aa6aea..f9a1471 100644 --- a/spouttest/asserts.go +++ b/spouttest/asserts.go @@ -1,11 +1,14 @@ package spouttest import ( + "fmt" + "strconv" "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // AssertRecv checks that a specific string has been received from a @@ -49,3 +52,47 @@ func stripLeadingNL(s string) string { } return s } + +// AssertMonitor ensures that a number of lines have been from a +// component's statistician goroutine. The target lines may arrive in +// any order and non-matching lines are ignored. Timestamps on the +// received lines are checked for and then stripped. +func AssertMonitor(t *testing.T, ch chan string, expected []string) { + remaining := make(map[string]bool) + for _, line := range expected { + remaining[line] = true + } + + var seenLines string + timeout := time.After(LongWait) + for { + select { + case lines := <-ch: + for _, line := range strings.Split(lines, "\n") { + if len(line) == 0 { + continue + } + line = stripTimestamp(t, line) + seenLines += fmt.Sprintf("%q\n", line) + delete(remaining, line) + } + if len(remaining) < 1 { + return + } + case <-timeout: + t.Fatalf("timed out waiting for expected lines. received:\n%s", seenLines) + } + } +} + +func stripTimestamp(t *testing.T, s string) string { + i := strings.LastIndexByte(s, ' ') + require.True(t, i >= 0) + + // Check that end looks like a timestamp + _, err := strconv.Atoi(s[i+1:]) + require.NoError(t, err) + + // Strip off the timestamp + return s[:i] +}