From 69b3777ac62eeea2b41d3632c192d22a7f260ab0 Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 8 May 2018 12:01:09 +1200 Subject: [PATCH] Test readiness probes in end-to-end test --- spouttest/e2e_test.go | 62 +++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/spouttest/e2e_test.go b/spouttest/e2e_test.go index 0e76610..8bb66a1 100644 --- a/spouttest/e2e_test.go +++ b/spouttest/e2e_test.go @@ -38,13 +38,24 @@ import ( ) const ( - natsPort = 44500 - influxdPort = 44501 - listenerPort = 44502 - httpListenerPort = 44503 - monitorPort = 44504 - influxDBName = "test" - sendCount = 10 + natsPort = 44500 + influxdPort = 44501 + + listenerPort = 44502 + listenerProbePort = 55502 + + httpListenerPort = 44503 + httpListenerProbePort = 55503 + + filterProbePort = 55504 + + writerProbePort = 55505 + + monitorPort = 44506 + monitorProbePort = 55506 + + influxDBName = "test" + sendCount = 10 ) func TestEndToEnd(t *testing.T) { @@ -63,23 +74,23 @@ func TestEndToEnd(t *testing.T) { // Start spout components. listener := startListener(t, fs) defer listener.Stop() + spouttest.AssertReadyProbe(t, listenerProbePort) httpListener := startHTTPListener(t, fs) defer httpListener.Stop() + spouttest.AssertReadyProbe(t, httpListenerProbePort) filter := startFilter(t, fs) defer filter.Stop() + spouttest.AssertReadyProbe(t, filterProbePort) writer := startWriter(t, fs) defer writer.Stop() + spouttest.AssertReadyProbe(t, writerProbePort) monitor := startMonitor(t, fs) defer monitor.Stop() - - // Make sure the listeners & monitor are actually listening. - assertReady(t, listener) - assertReady(t, httpListener) - assertReady(t, monitor) + spouttest.AssertReadyProbe(t, monitorProbePort) // Connect to the listener. addr := net.JoinHostPort("localhost", strconv.Itoa(listenerPort)) @@ -165,18 +176,6 @@ $`[1:]) t.Fatalf("Failed to see expected metrics. Last saw:\n%s", lines) } -type HasReady interface { - Ready() <-chan struct{} -} - -func assertReady(t *testing.T, component interface{}) { - select { - case <-component.(HasReady).Ready(): - case <-time.After(spouttest.LongWait): - t.Fatal("timeout out waiting for component to be ready") - } -} - const cpuLine = "cpu,env=prod,cls=server user=13.33,usage_system=0.16,usage_idle=86.53" func makeTestLines() *bytes.Buffer { @@ -200,7 +199,8 @@ nats_address = "nats://localhost:%d" batch = 5 debug = true nats_subject_monitor = "monitor" -`, listenerPort, natsPort)) +probe_port = %d +`, listenerPort, natsPort, listenerProbePort)) } func startHTTPListener(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -211,7 +211,8 @@ nats_address = "nats://localhost:%d" batch = 5 debug = true nats_subject_monitor = "monitor" -`, httpListenerPort, natsPort)) +probe_port = %d +`, httpListenerPort, natsPort, httpListenerProbePort)) } func startFilter(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -220,12 +221,13 @@ mode = "filter" nats_address = "nats://localhost:%d" debug = true nats_subject_monitor = "monitor" +probe_port = %d [[rule]] type = "basic" match = "cpu" subject = "system" -`, natsPort)) +`, natsPort, filterProbePort)) } func startWriter(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -239,7 +241,8 @@ batch = 1 workers = 4 debug = true nats_subject_monitor = "monitor" -`, natsPort, influxdPort, influxDBName)) +probe_port = %d +`, natsPort, influxdPort, influxDBName, writerProbePort)) } func startMonitor(t *testing.T, fs afero.Fs) cmd.Stoppable { @@ -248,7 +251,8 @@ mode = "monitor" nats_address = "nats://localhost:%d" nats_subject_monitor = "monitor" port = %d -`, natsPort, monitorPort)) +probe_port = %d +`, natsPort, monitorPort, monitorProbePort)) } func startComponent(t *testing.T, fs afero.Fs, name, config string) cmd.Stoppable {