diff --git a/listener/listener.go b/listener/listener.go index 3e8efda..fc354e0 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -107,6 +107,7 @@ type Listener struct { wg sync.WaitGroup stop chan struct{} + mu sync.Mutex // only used for HTTP listener } // Stop shuts down a running listener. It should be called exactly @@ -205,7 +206,9 @@ func (l *Listener) listenUDP(sc *net.UDPConn) { func (l *Listener) setupHTTP() *http.Server { mux := http.NewServeMux() mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { + l.mu.Lock() bytesRead, err := l.batch.readFrom(r.Body) + l.mu.Unlock() if err != nil { l.stats.Inc(statReadErrors) } @@ -213,7 +216,9 @@ func (l *Listener) setupHTTP() *http.Server { if l.c.Debug { log.Printf("HTTP listener read %d bytes", bytesRead) } + l.mu.Lock() l.processRead() + l.mu.Unlock() } }) return &http.Server{ diff --git a/listener/listener_medium_test.go b/listener/listener_medium_test.go index d4f0626..363f020 100644 --- a/listener/listener_medium_test.go +++ b/listener/listener_medium_test.go @@ -243,6 +243,62 @@ func TestHTTPListenerBigPOST(t *testing.T) { assertMonitor(t, monitorCh, 1, 1) } +func TestHTTPListenerConcurrency(t *testing.T) { + conf := testConfig() + listener, err := StartHTTPListener(conf) + require.NoError(t, err) + spouttest.AssertReadyProbe(t, conf.ProbePort) + defer listener.Stop() + + listenerCh, unsubListener := subListener(t) + defer unsubListener() + + // Send the same line many times from multiple goroutines. + const senders = 10 + const sendCount = 100 + const totalLines = senders * sendCount + sendLine := fmt.Sprintf("cpu load=0.69 foo=bar %d\n", time.Now().UnixNano()) + + url := fmt.Sprintf("http://localhost:%d/write", listenPort) + errs := make(chan error, senders) + for sender := 0; sender < senders; sender++ { + go func() { + client := new(http.Client) + for i := 0; i < sendCount; i++ { + _, err := client.Post(url, "text/plain", bytes.NewBufferString(sendLine)) + if err != nil { + errs <- err + } + } + errs <- nil + }() + } + + // Wait for the senders to be done sending, and all the lines to + // be returned. + sendersDone := 0 + received := 0 + timeout := time.After(spouttest.LongWait) + for received < totalLines || sendersDone < senders { + select { + case lines := <-listenerCh: + for _, line := range strings.SplitAfter(lines, "\n") { + if len(line) > 0 { + require.Equal(t, sendLine, line) + received++ + } + } + case err := <-errs: + require.NoError(t, err) + sendersDone++ + case <-timeout: + t.Fatal("timed out waiting for lines") + } + } + + assertNoMore(t, listenerCh) +} + func BenchmarkListenerLatency(b *testing.B) { listener := startListener(b, testConfig()) defer listener.Stop()