Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Test readiness probes in end-to-end test
Browse files Browse the repository at this point in the history
  • Loading branch information
mjs committed May 8, 2018
1 parent 55bf27b commit 69b3777
Showing 1 changed file with 33 additions and 29 deletions.
62 changes: 33 additions & 29 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,24 @@ import (
)

const (
natsPort = 44500
influxdPort = 44501
listenerPort = 44502
httpListenerPort = 44503
monitorPort = 44504
influxDBName = "test"
sendCount = 10
natsPort = 44500
influxdPort = 44501

listenerPort = 44502
listenerProbePort = 55502

httpListenerPort = 44503
httpListenerProbePort = 55503

filterProbePort = 55504

writerProbePort = 55505

monitorPort = 44506
monitorProbePort = 55506

influxDBName = "test"
sendCount = 10
)

func TestEndToEnd(t *testing.T) {
Expand All @@ -63,23 +74,23 @@ func TestEndToEnd(t *testing.T) {
// Start spout components.
listener := startListener(t, fs)
defer listener.Stop()
spouttest.AssertReadyProbe(t, listenerProbePort)

httpListener := startHTTPListener(t, fs)
defer httpListener.Stop()
spouttest.AssertReadyProbe(t, httpListenerProbePort)

filter := startFilter(t, fs)
defer filter.Stop()
spouttest.AssertReadyProbe(t, filterProbePort)

writer := startWriter(t, fs)
defer writer.Stop()
spouttest.AssertReadyProbe(t, writerProbePort)

monitor := startMonitor(t, fs)
defer monitor.Stop()

// Make sure the listeners & monitor are actually listening.
assertReady(t, listener)
assertReady(t, httpListener)
assertReady(t, monitor)
spouttest.AssertReadyProbe(t, monitorProbePort)

// Connect to the listener.
addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort))
Expand Down Expand Up @@ -165,18 +176,6 @@ $`[1:])
t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines)
}

type HasReady interface {
Ready() <-chan struct{}
}

func assertReady(t *testing.T, component interface{}) {
select {
case <-component.(HasReady).Ready():
case <-time.After(spouttest.LongWait):
t.Fatal("timeout out waiting for component to be ready")
}
}

const cpuLine = "cpu,env=prod,cls=server user=13.33,usage_system=0.16,usage_idle=86.53"

func makeTestLines() *bytes.Buffer {
Expand All @@ -200,7 +199,8 @@ nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, listenerPort, natsPort))
probe_port = %d
`, listenerPort, natsPort, listenerProbePort))
}

func startHTTPListener(t *testing.T, fs afero.Fs) cmd.Stoppable {
Expand All @@ -211,7 +211,8 @@ nats_address = "nats://localhost:%d"
batch = 5
debug = true
nats_subject_monitor = "monitor"
`, httpListenerPort, natsPort))
probe_port = %d
`, httpListenerPort, natsPort, httpListenerProbePort))
}

func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable {
Expand All @@ -220,12 +221,13 @@ mode = "filter"
nats_address = "nats://localhost:%d"
debug = true
nats_subject_monitor = "monitor"
probe_port = %d
[[rule]]
type = "basic"
match = "cpu"
subject = "system"
`, natsPort))
`, natsPort, filterProbePort))
}

func startWriter(t *testing.T, fs afero.Fs) cmd.Stoppable {
Expand All @@ -239,7 +241,8 @@ batch = 1
workers = 4
debug = true
nats_subject_monitor = "monitor"
`, natsPort, influxdPort, influxDBName))
probe_port = %d
`, natsPort, influxdPort, influxDBName, writerProbePort))
}

func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable {
Expand All @@ -248,7 +251,8 @@ mode = "monitor"
nats_address = "nats://localhost:%d"
nats_subject_monitor = "monitor"
port = %d
`, natsPort, monitorPort))
probe_port = %d
`, natsPort, monitorPort, monitorProbePort))
}

func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable {
Expand Down

0 comments on commit 69b3777

Please sign in to comment.