Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
spouttest: Include the monitor in the end-to-end test
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed Apr 10, 2018
1 parent bfac4dd commit 8f7c80e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 6 deletions.
16 changes: 16 additions & 0 deletions spouttest/asserts.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,23 @@ func AssertMonitor(t *testing.T, ch chan string, expected []string) {
}
}

// StripTimestamps takes a string containing one or more metrics
// lines, validates that each line appears to end with a timestamp and
// then strips the timestamp off. The returned string is the same as
// the input but without the timestamps (for easier test comparisons).
func StripTimestamps(t *testing.T, s string) string {
var out []string
for _, line := range strings.Split(s, "\n") {
out = append(out, stripTimestamp(t, line))
}
return strings.Join(out, "\n")
}

func stripTimestamp(t *testing.T, s string) string {
if len(s) < 1 {
return ""
}

i := strings.LastIndexByte(s, ' ')
require.True(t, i >= 0)

Expand Down
62 changes: 56 additions & 6 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package spouttest_test
import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
Expand All @@ -40,6 +41,7 @@ const (
influxdPort = 44501
listenerPort = 44502
httpListenerPort = 44503
monitorPort = 44504
influxDBName = "test"
sendCount = 10
)
Expand Down Expand Up @@ -70,9 +72,13 @@ func TestEndToEnd(t *testing.T) {
writer := startWriter(t, fs)
defer writer.Stop()

// Make sure the listeners are actually listening.
assertListenerReady(t, listener)
assertListenerReady(t, httpListener)
monitor := startMonitor(t, fs)
defer monitor.Stop()

// Make sure the listeners & monitor are actually listening.
assertReady(t, listener)
assertReady(t, httpListener)
assertReady(t, monitor)

// Connect to the listener.
addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort))
Expand Down Expand Up @@ -120,17 +126,48 @@ func TestEndToEnd(t *testing.T) {
}
time.Sleep(250 * time.Millisecond)
}

// Check metrics published by monitor component.
expectedMetrics := `
failed_writes{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 0
invalid_time{filter="filter"} 0
passed{filter="filter"} 10
processed{filter="filter"} 20
read_errors{listener="listener"} 0
received{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2
received{listener="listener"} 5
rejected{filter="filter"} 10
sent{listener="listener"} 1
triggered{filter="filter",rule="system"} 10
write_requests{influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",writer="writer"} 2
`[1:]
var lines string
for try := 0; try < 20; try++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", monitorPort))
require.NoError(t, err)

raw, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

lines = spouttest.StripTimestamps(t, string(raw))
if lines == expectedMetrics {
return
}
time.Sleep(500 * time.Millisecond)
}

t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines)
}

type HasReady interface {
Ready() <-chan struct{}
}

func assertListenerReady(t *testing.T, listener interface{}) {
func assertReady(t *testing.T, component interface{}) {
select {
case <-listener.(HasReady).Ready():
case <-component.(HasReady).Ready():
case <-time.After(spouttest.LongWait):
t.Fatal("timeout out waiting for listener to be ready")
t.Fatal("timeout out waiting for component to be ready")
}
}

Expand All @@ -156,6 +193,7 @@ port = %d
nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, listenerPort, natsPort))
}

Expand All @@ -166,6 +204,7 @@ port = %d
nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, httpListenerPort, natsPort))
}

Expand All @@ -174,6 +213,7 @@ func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable {
mode = "filter"
nats_address = "nats://localhost:%d"
debug = true
nats_subject_monitor = "monitor"
[[rule]]
type = "basic"
Expand All @@ -192,9 +232,19 @@ influxdb_dbname = "%s"
batch = 1
workers = 4
debug = true
nats_subject_monitor = "monitor"
`, natsPort, influxdPort, influxDBName))
}

func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable {
return startComponent(t, fs, "monitor", fmt.Sprintf(`
mode = "monitor"
nats_address = "nats://localhost:%d"
nats_subject_monitor = "monitor"
port = %d
`, natsPort, monitorPort))
}

func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable {
configFilename := name + ".toml"
err := afero.WriteFile(fs, configFilename, []byte(config), 0600)
Expand Down

0 comments on commit 8f7c80e

Please sign in to comment.