Skip to content

Commit

Permalink
Merge pull request #34 from libp2p/fix/transport
Browse files Browse the repository at this point in the history
fix: cleanup transport suite
  • Loading branch information
Stebalien authored Jul 23, 2021
2 parents e7dc022 + 79f1891 commit 8852a73
Showing 1 changed file with 107 additions and 99 deletions.
206 changes: 107 additions & 99 deletions suites/transport/stream_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"io/ioutil"
"os"
"runtime/debug"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -82,23 +81,13 @@ func randBuf(size int) []byte {
return randomness[start : start+size]
}

func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
debug.PrintStack()
// TODO: not safe to call in parallel
t.Fatal(err)
}
}

func debugLog(t *testing.T, s string, args ...interface{}) {
if VerboseDebugging {
t.Logf(s, args...)
}
}

func echoStream(t *testing.T, s mux.MuxedStream) {
defer s.Close()
// echo everything
var err error
if VerboseDebugging {
Expand All @@ -123,42 +112,45 @@ func (lw *logWriter) Write(buf []byte) (int, error) {
return lw.W.Write(buf)
}

func goServe(t *testing.T, l transport.Listener) (done func()) {
closed := make(chan struct{}, 1)
func echo(t *testing.T, c transport.CapableConn) {
var wg sync.WaitGroup
defer wg.Wait()
for {
str, err := c.AcceptStream()
if err != nil {
break
}
wg.Add(1)
go func() {
defer wg.Done()
defer str.Close()
echoStream(t, str)
}()
}
}

go func() {
for {
c, err := l.Accept()
if err != nil {
select {
case <-closed:
return // closed naturally.
default:
checkErr(t, err)
}
}
func serve(t *testing.T, l transport.Listener) {
var wg sync.WaitGroup
defer wg.Wait()

debugLog(t, "accepted connection")
go func() {
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
}()
for {
c, err := l.Accept()
if err != nil {
return
}
}()

return func() {
closed <- struct{}{}
wg.Add(1)
debugLog(t, "accepted connection")
go func() {
defer wg.Done()
defer c.Close()
echo(t, c)
}()
}
}

func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) {
msgsize := 1 << 11
errs := make(chan error) // dont block anything.

rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
rateLimitChan := make(chan struct{}, rateLimitN)
Expand All @@ -180,7 +172,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
bufs <- buf
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
t.Errorf("s.Write(buf): %s", err)
continue
}
}
Expand All @@ -196,12 +188,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])

if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
t.Errorf("io.ReadFull(s, buf2): %s", err)
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
t.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
}
}
}
Expand All @@ -211,7 +203,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,

s, err := c.OpenStream(context.Background())
if err != nil {
errs <- fmt.Errorf("failed to create NewStream: %s", err)
t.Errorf("failed to create NewStream: %s", err)
return
}

Expand All @@ -228,68 +220,67 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
openConnAndRW := func() {
debugLog(t, "openConnAndRW")

var wg sync.WaitGroup
defer wg.Wait()

l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Error(err)
return
}
defer l.Close()

done := goServe(t, l)
defer done()
wg.Add(1)
go func() {
defer wg.Done()
serve(t, l)
}()

c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
if err != nil {
t.Error(err)
return
}

// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
wg.Add(1)
go func() {
defer wg.Done()
defer c.Close()
debugLog(t, "serving connection")
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
echo(t, c)
}()

var wg sync.WaitGroup
var openWg sync.WaitGroup
for i := 0; i < opt.StreamNum; i++ {
wg.Add(1)
openWg.Add(1)
go rateLimit(func() {
defer wg.Done()
defer openWg.Done()
openStreamAndRW(c)
})
}
wg.Wait()
c.Close()
}

openConnsAndRW := func() {
debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum)

var wg sync.WaitGroup
for i := 0; i < opt.ConnNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}
wg.Wait()
openWg.Wait()
}

go func() {
openConnsAndRW()
close(errs) // done
}()
debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum)

for err := range errs {
t.Error(err)
var wg sync.WaitGroup
defer wg.Wait()
for i := 0; i < opt.ConnNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}

}

func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Fatal(err)
}
defer l.Close()

count := 10000
Expand All @@ -311,8 +302,13 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.
accepted <- err
}()
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
checkErr(t, <-accepted)
if err != nil {
t.Fatal(err)
}
err = <-accepted
if err != nil {
t.Fatal(err)
}

defer func() {
if connA != nil {
Expand Down Expand Up @@ -379,22 +375,36 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.
}

func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
var wg sync.WaitGroup
defer wg.Wait()

l, err := ta.Listen(maddr)
checkErr(t, err)
if err != nil {
t.Fatal(err)
}
defer l.Close()

done := make(chan struct{}, 2)
wg.Add(1)
go func() {
defer wg.Done()

muxa, err := l.Accept()
checkErr(t, err)
if err != nil {
t.Error(err)
return
}
defer muxa.Close()

s, err := muxa.OpenStream(context.Background())
if err != nil {
panic(err)
t.Error(err)
return
}
defer s.Close()

// Some transports won't open the stream until we write. That's
// fine.
s.Write([]byte("foo"))
_, _ = s.Write([]byte("foo"))

time.Sleep(time.Millisecond * 50)

Expand All @@ -403,22 +413,20 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi
t.Error("should have failed to write")
}

s.Close()
done <- struct{}{}
}()

muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)

go func() {
str, err := muxb.AcceptStream()
checkErr(t, err)
str.Reset()
done <- struct{}{}
}()
if err != nil {
t.Fatal(err)
}
defer muxb.Close()

<-done
<-done
str, err := muxb.AcceptStream()
if err != nil {
t.Error(err)
return
}
str.Reset()
}

func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
Expand Down

0 comments on commit 8852a73

Please sign in to comment.