Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
listener: Protect batch buffer from concurrent access
Browse files Browse the repository at this point in the history
The HTTP listener uses concurrent HTTP handlers which were modifying
the batch buffer without synchronisation. This resulted in data races
and corrupt output from the listener. The test added in this change
would reliably result in corrupt output before the mutex was added.

The mutex is not required or used for the UDP listener as there is
only one goroutine in that case.
  • Loading branch information
mjs committed May 15, 2018
1 parent 4a53ca9 commit cd7972a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
5 changes: 5 additions & 0 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type Listener struct {

wg sync.WaitGroup
stop chan struct{}
mu sync.Mutex // only used for HTTP listener
}

// Stop shuts down a running listener. It should be called exactly
Expand Down Expand Up @@ -205,15 +206,19 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {
func (l *Listener) setupHTTP() *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
l.mu.Lock()
bytesRead, err := l.batch.readFrom(r.Body)
l.mu.Unlock()
if err != nil {
l.stats.Inc(statReadErrors)
}
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.mu.Lock()
l.processRead()
l.mu.Unlock()
}
})
return &http.Server{
Expand Down
56 changes: 56 additions & 0 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,62 @@ func TestHTTPListenerBigPOST(t *testing.T) {
assertMonitor(t, monitorCh, 1, 1)
}

func TestHTTPListenerConcurrency(t *testing.T) {
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
defer unsubListener()

// Send the same line many times from multiple goroutines.
const senders = 10
const sendCount = 100
const totalLines = senders * sendCount
sendLine := fmt.Sprintf("cpu load=0.69 foo=bar %d\n", time.Now().UnixNano())

url := fmt.Sprintf("http://localhost:%d/write", listenPort)
errs := make(chan error, senders)
for sender := 0; sender < senders; sender++ {
go func() {
client := new(http.Client)
for i := 0; i < sendCount; i++ {
_, err := client.Post(url, "text/plain", bytes.NewBufferString(sendLine))
if err != nil {
errs <- err
}
}
errs <- nil
}()
}

// Wait for the senders to be done sending, and all the lines to
// be returned.
sendersDone := 0
received := 0
timeout := time.After(spouttest.LongWait)
for received < totalLines || sendersDone < senders {
select {
case lines := <-listenerCh:
for _, line := range strings.SplitAfter(lines, "\n") {
if len(line) > 0 {
require.Equal(t, sendLine, line)
received++
}
}
case err := <-errs:
require.NoError(t, err)
sendersDone++
case <-timeout:
t.Fatal("timed out waiting for lines")
}
}

assertNoMore(t, listenerCh)
}

func BenchmarkListenerLatency(b *testing.B) {
listener := startListener(b, testConfig())
defer listener.Stop()
Expand Down

0 comments on commit cd7972a

Please sign in to comment.