From 79f1891792b3f259a63848c5b494daa5b19b671f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 22 Jul 2021 17:59:03 -0700 Subject: [PATCH] fix: cleanup transport suite --- suites/transport/stream_suite.go | 206 ++++++++++++++++--------------- 1 file changed, 107 insertions(+), 99 deletions(-) diff --git a/suites/transport/stream_suite.go b/suites/transport/stream_suite.go index 00bca4f..e543999 100644 --- a/suites/transport/stream_suite.go +++ b/suites/transport/stream_suite.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "os" - "runtime/debug" "strconv" "sync" "testing" @@ -82,15 +81,6 @@ func randBuf(size int) []byte { return randomness[start : start+size] } -func checkErr(t *testing.T, err error) { - t.Helper() - if err != nil { - debug.PrintStack() - // TODO: not safe to call in parallel - t.Fatal(err) - } -} - func debugLog(t *testing.T, s string, args ...interface{}) { if VerboseDebugging { t.Logf(s, args...) @@ -98,7 +88,6 @@ func debugLog(t *testing.T, s string, args ...interface{}) { } func echoStream(t *testing.T, s mux.MuxedStream) { - defer s.Close() // echo everything var err error if VerboseDebugging { @@ -123,42 +112,45 @@ func (lw *logWriter) Write(buf []byte) (int, error) { return lw.W.Write(buf) } -func goServe(t *testing.T, l transport.Listener) (done func()) { - closed := make(chan struct{}, 1) +func echo(t *testing.T, c transport.CapableConn) { + var wg sync.WaitGroup + defer wg.Wait() + for { + str, err := c.AcceptStream() + if err != nil { + break + } + wg.Add(1) + go func() { + defer wg.Done() + defer str.Close() + echoStream(t, str) + }() + } +} - go func() { - for { - c, err := l.Accept() - if err != nil { - select { - case <-closed: - return // closed naturally. - default: - checkErr(t, err) - } - } +func serve(t *testing.T, l transport.Listener) { + var wg sync.WaitGroup + defer wg.Wait() - debugLog(t, "accepted connection") - go func() { - for { - str, err := c.AcceptStream() - if err != nil { - break - } - go echoStream(t, str) - } - }() + for { + c, err := l.Accept() + if err != nil { + return } - }() - return func() { - closed <- struct{}{} + wg.Add(1) + debugLog(t, "accepted connection") + go func() { + defer wg.Done() + defer c.Close() + echo(t, c) + }() } } func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) { msgsize := 1 << 11 - errs := make(chan error) // dont block anything. rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. rateLimitChan := make(chan struct{}, rateLimitN) @@ -180,7 +172,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, bufs <- buf debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3]) if _, err := s.Write(buf); err != nil { - errs <- fmt.Errorf("s.Write(buf): %s", err) + t.Errorf("s.Write(buf): %s", err) continue } } @@ -196,12 +188,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + t.Errorf("io.ReadFull(s, buf2): %s", err) debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3]) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) } } } @@ -211,7 +203,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, s, err := c.OpenStream(context.Background()) if err != nil { - errs <- fmt.Errorf("failed to create NewStream: %s", err) + t.Errorf("failed to create NewStream: %s", err) return } @@ -228,68 +220,67 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, openConnAndRW := func() { debugLog(t, "openConnAndRW") + var wg sync.WaitGroup + defer wg.Wait() + l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Error(err) + return + } + defer l.Close() - done := goServe(t, l) - defer done() + wg.Add(1) + go func() { + defer wg.Done() + serve(t, l) + }() c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) + if err != nil { + t.Error(err) + return + } // serve the outgoing conn, because some muxers assume // that we _always_ call serve. (this is an error?) + wg.Add(1) go func() { + defer wg.Done() + defer c.Close() debugLog(t, "serving connection") - for { - str, err := c.AcceptStream() - if err != nil { - break - } - go echoStream(t, str) - } + echo(t, c) }() - var wg sync.WaitGroup + var openWg sync.WaitGroup for i := 0; i < opt.StreamNum; i++ { - wg.Add(1) + openWg.Add(1) go rateLimit(func() { - defer wg.Done() + defer openWg.Done() openStreamAndRW(c) }) } - wg.Wait() - c.Close() - } - - openConnsAndRW := func() { - debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) - - var wg sync.WaitGroup - for i := 0; i < opt.ConnNum; i++ { - wg.Add(1) - go rateLimit(func() { - defer wg.Done() - openConnAndRW() - }) - } - wg.Wait() + openWg.Wait() } - go func() { - openConnsAndRW() - close(errs) // done - }() + debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum) - for err := range errs { - t.Error(err) + var wg sync.WaitGroup + defer wg.Wait() + for i := 0; i < opt.ConnNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openConnAndRW() + }) } - } func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Fatal(err) + } defer l.Close() count := 10000 @@ -311,8 +302,13 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma. accepted <- err }() connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) - checkErr(t, <-accepted) + if err != nil { + t.Fatal(err) + } + err = <-accepted + if err != nil { + t.Fatal(err) + } defer func() { if connA != nil { @@ -379,22 +375,36 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma. } func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) { + var wg sync.WaitGroup + defer wg.Wait() + l, err := ta.Listen(maddr) - checkErr(t, err) + if err != nil { + t.Fatal(err) + } + defer l.Close() - done := make(chan struct{}, 2) + wg.Add(1) go func() { + defer wg.Done() + muxa, err := l.Accept() - checkErr(t, err) + if err != nil { + t.Error(err) + return + } + defer muxa.Close() s, err := muxa.OpenStream(context.Background()) if err != nil { - panic(err) + t.Error(err) + return } + defer s.Close() // Some transports won't open the stream until we write. That's // fine. - s.Write([]byte("foo")) + _, _ = s.Write([]byte("foo")) time.Sleep(time.Millisecond * 50) @@ -403,22 +413,20 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi t.Error("should have failed to write") } - s.Close() - done <- struct{}{} }() muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA) - checkErr(t, err) - - go func() { - str, err := muxb.AcceptStream() - checkErr(t, err) - str.Reset() - done <- struct{}{} - }() + if err != nil { + t.Fatal(err) + } + defer muxb.Close() - <-done - <-done + str, err := muxb.AcceptStream() + if err != nil { + t.Error(err) + return + } + str.Reset() } func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {