Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
filter: Emit monitor metrics in Prometheus format
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed Apr 10, 2018
1 parent 7a80da7 commit 8f96283
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 103 deletions.
101 changes: 64 additions & 37 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@ package filter
import (
"fmt"
"log"
"strconv"
"sort"
"strings"
"sync"
"time"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/lineformatter"
"github.com/jumptrading/influx-spout/prometheus"
"github.com/jumptrading/influx-spout/stats"
"github.com/nats-io/go-nats"
)

// Name for supported stats
const (
linesPassed = "passed"
linesProcessed = "processed"
linesRejected = "rejected"
linesInvalidTime = "invalid-time"
statPassed = "passed"
statProcessed = "processed"
statRejected = "rejected"
statInvalidTime = "invalid_time"
)

// StartFilter creates a Filter instance, sets up its rules based on
Expand Down Expand Up @@ -109,10 +110,10 @@ func (f *Filter) natsConnect() (natsConn, error) {
func initStats(rules *RuleSet) *stats.Stats {
// Initialise
statNames := []string{
linesPassed,
linesProcessed,
linesRejected,
linesInvalidTime,
statPassed,
statProcessed,
statRejected,
statInvalidTime,
}
for i := 0; i < rules.Count(); i++ {
statNames = append(statNames, ruleToStatsName(i))
Expand Down Expand Up @@ -155,40 +156,36 @@ func (f *Filter) Stop() {
// startStatistician defines a goroutine that is responsible for
// regularly sending the filter's statistics to the monitoring
// backend.
func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
func (f *Filter) startStatistician(st *stats.Stats, rules *RuleSet) {
defer f.wg.Done()

totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
linesPassed, linesProcessed, linesRejected, linesInvalidTime,
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
[]string{"filter", "rule"},
"triggered",
)
generalLabels := map[string]string{
"filter": f.c.Name,
}

for {
st := stats.Clone()
now := time.Now()
snap, ruleCounts := splitSnapshot(st.Snapshot())

// publish the grand stats
f.nc.Publish(f.c.NATSSubjectMonitor, totalLine.Format(
[]string{f.c.Name},
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
st.Get(linesInvalidTime),
))
// publish the general stats
lines := stats.SnapshotToPrometheus(snap, now, generalLabels)
f.nc.Publish(f.c.NATSSubjectMonitor, lines)

// publish the per rule stats
// XXX merge with SnapshotToPrometheus
millis := now.UnixNano() / int64(time.Millisecond)
for i, subject := range rules.Subjects() {
f.nc.Publish(f.c.NATSSubjectMonitor,
ruleLine.Format(
[]string{f.c.Name, subject},
st.Get(ruleToStatsName(i)),
),
)
metric := &prometheus.Metric{
Name: []byte("triggered"),
Labels: prometheus.LabelPairs{
{[]byte("filter"), []byte(f.c.Name)},
{[]byte("rule"), []byte(subject)},
},
Value: int64(ruleCounts[i]),
Milliseconds: millis,
}

f.nc.Publish(f.c.NATSSubjectMonitor, metric.ToBytes())
}

select {
Expand All @@ -199,8 +196,38 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
}
}

const rulePrefix = "rule-"

// ruleToStatsName converts a rule index to a name to a key for use
// with a stats.Stats instance.
func ruleToStatsName(i int) string {
return "rule" + strconv.Itoa(i)
return fmt.Sprintf("%s%06d", rulePrefix, i)
}

// splitSnapshot takes a Snapshot and splits out the rule counters
// from the others. The rule counters are returned in an ordered slice
// while the other counters are returned as a new (smaller) Snapshot.
func splitSnapshot(snap stats.Snapshot) (stats.Snapshot, []int) {
var genSnap stats.Snapshot
var ruleSnap stats.Snapshot

// Split up rule counters from the others.
for _, counter := range snap {
if strings.HasPrefix(counter.Name, rulePrefix) {
ruleSnap = append(ruleSnap, counter)
} else {
genSnap = append(genSnap, counter)
}
}

// Sort the rule counters by name and extract just the counts.
sort.Slice(ruleSnap, func(i, j int) bool {
return ruleSnap[i].Name < ruleSnap[j].Name
})
ruleCounts := make([]int, len(ruleSnap))
for i, counter := range ruleSnap {
ruleCounts[i] = counter.Value
}

return genSnap, ruleCounts
}
46 changes: 22 additions & 24 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func TestFilterWorker(t *testing.T) {
})
require.NoError(t, err)

// Subscribe to stats output
statsCh := make(chan string, 10)
// Subscribe to monitor output
monitorCh := make(chan string, 10)
_, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) {
statsCh <- string(msg.Data)
monitorCh <- string(msg.Data)
})
require.NoError(t, err)

Expand All @@ -103,15 +103,14 @@ hello,host=gopher01
goodbye,host=gopher01
`)

// Receive total stats
spouttest.AssertRecvMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1,invalid-time=0
`)

// Receive rule specific stats
spouttest.AssertRecvMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
// Receive monitor metrics
spouttest.AssertMonitor(t, monitorCh, []string{
`passed{filter="particle"} 2`,
`processed{filter="particle"} 3`,
`rejected{filter="particle"} 1`,
`invalid_time{filter="particle"} 0`,
`triggered{filter="particle",rule="hello-subject"} 2`,
})
}

func TestInvalidTimeStamps(t *testing.T) {
Expand All @@ -136,10 +135,10 @@ func TestInvalidTimeStamps(t *testing.T) {
})
require.NoError(t, err)

// Subscribe to stats output
statsCh := make(chan string, 10)
// Subscribe to monitor output
monitorCh := make(chan string, 10)
_, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) {
statsCh <- string(msg.Data)
monitorCh <- string(msg.Data)
})
require.NoError(t, err)

Expand All @@ -161,13 +160,12 @@ func TestInvalidTimeStamps(t *testing.T) {
// Expect to see the 3rd & 4th lines.
spouttest.AssertRecv(t, helloCh, "helloCh", strings.Join(lines[2:], "\n"))

// Receive total stats.
spouttest.AssertRecvMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=4,rejected=0,invalid-time=2
`)

// Receive rule specific stats
spouttest.AssertRecvMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
// Receive monitor metrics.
spouttest.AssertMonitor(t, monitorCh, []string{
`passed{filter="particle"} 2`,
`processed{filter="particle"} 4`,
`rejected{filter="particle"} 0`,
`invalid_time{filter="particle"} 2`,
`triggered{filter="particle",rule="hello-subject"} 2`,
})
}
8 changes: 4 additions & 4 deletions filter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func (w *worker) processBatch(batch []byte) {
if len(line) == 0 {
continue
}
w.stats.Inc(linesProcessed)
w.stats.Inc(statProcessed)

ts := extractTimestamp(line, now)
if minTs < ts && ts < maxTs {
w.processLine(line)
} else {
w.stats.Inc(linesInvalidTime)
w.stats.Inc(statInvalidTime)
if w.debug {
log.Printf("invalid line timestamp: %q", string(line))
}
Expand All @@ -112,15 +112,15 @@ func (w *worker) processLine(line []byte) {
idx := w.rules.Lookup(line)
if idx == -1 {
// no rule for this => junkyard
w.stats.Inc(linesRejected)
w.stats.Inc(statRejected)
w.junkBatch.Write(line)
return
}

// write to the corresponding batch buffer
w.batches[idx].Write(line)

w.stats.Inc(linesPassed)
w.stats.Inc(statPassed)
w.stats.Inc(ruleToStatsName(idx))
}

Expand Down
43 changes: 5 additions & 38 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -296,42 +295,10 @@ func assertNoMore(t *testing.T, ch chan string) {
}

func assertMonitor(t *testing.T, monitorCh chan string, received, sent int) {
remaining := map[string]bool{
fmt.Sprintf(`received{listener="testlistener"} %d`, received): true,
fmt.Sprintf(`sent{listener="testlistener"} %d`, sent): true,
`read_errors{listener="testlistener"} 0`: true,
expected := []string{
fmt.Sprintf(`received{listener="testlistener"} %d`, received),
fmt.Sprintf(`sent{listener="testlistener"} %d`, sent),
`read_errors{listener="testlistener"} 0`,
}

var allLines string
timeout := time.After(spouttest.LongWait)
for {
select {
case lines := <-monitorCh:
for _, line := range strings.Split(lines, "\n") {
if len(line) == 0 {
continue
}
line = stripTimestamp(t, line)
allLines += fmt.Sprintf("%q\n", line)
delete(remaining, line)
}
if len(remaining) < 1 {
return
}
case <-timeout:
t.Fatalf("timed out waiting for expected stats. received: %s", allLines)
}
}
}

func stripTimestamp(t *testing.T, s string) string {
i := strings.LastIndexByte(s, ' ')
require.True(t, i >= 0)

// Check that end looks like a timestamp
_, err := strconv.Atoi(s[i+1:])
require.NoError(t, err)

// Strip off the timestamp
return s[:i]
spouttest.AssertMonitor(t, monitorCh, expected)
}
47 changes: 47 additions & 0 deletions spouttest/asserts.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package spouttest

import (
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// AssertRecv checks that a specific string has been received from a
Expand Down Expand Up @@ -49,3 +52,47 @@ func stripLeadingNL(s string) string {
}
return s
}

// AssertMonitor ensures that a number of lines have been from a
// component's statistician goroutine. The target lines may arrive in
// any order and non-matching lines are ignored. Timestamps on the
// received lines are checked for and then stripped.
func AssertMonitor(t *testing.T, ch chan string, expected []string) {
remaining := make(map[string]bool)
for _, line := range expected {
remaining[line] = true
}

var seenLines string
timeout := time.After(LongWait)
for {
select {
case lines := <-ch:
for _, line := range strings.Split(lines, "\n") {
if len(line) == 0 {
continue
}
line = stripTimestamp(t, line)
seenLines += fmt.Sprintf("%q\n", line)
delete(remaining, line)
}
if len(remaining) < 1 {
return
}
case <-timeout:
t.Fatalf("timed out waiting for expected lines. received:\n%s", seenLines)
}
}
}

func stripTimestamp(t *testing.T, s string) string {
i := strings.LastIndexByte(s, ' ')
require.True(t, i >= 0)

// Check that end looks like a timestamp
_, err := strconv.Atoi(s[i+1:])
require.NoError(t, err)

// Strip off the timestamp
return s[:i]
}

0 comments on commit 8f96283

Please sign in to comment.