From 4fa48655bb894f03cd0374bb314dbd228574c049 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 4 May 2021 21:43:48 -0700 Subject: [PATCH 1/2] chore: update yamux --- go.mod | 2 +- go.sum | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 0d904fd466..ed72c21e19 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/libp2p/go-libp2p-testing v0.4.0 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 - github.com/libp2p/go-libp2p-yamux v0.5.1 + github.com/libp2p/go-libp2p-yamux v0.5.2 github.com/libp2p/go-msgio v0.0.6 github.com/libp2p/go-netroute v0.1.6 github.com/libp2p/go-stream-muxer-multistream v0.3.0 diff --git a/go.sum b/go.sum index 3c382e4f40..6fdd5ee7d7 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/libp2p/go-libp2p-yamux v0.2.7/go.mod h1:X28ENrBMU/nm4I3Nx4sZ4dgjZ6VhL github.com/libp2p/go-libp2p-yamux v0.2.8/go.mod h1:/t6tDqeuZf0INZMTgd0WxIRbtK2EzI2h7HbFm9eAKI4= github.com/libp2p/go-libp2p-yamux v0.4.0/go.mod h1:+DWDjtFMzoAwYLVkNZftoucn7PelNoy5nm3tZ3/Zw30= github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po= -github.com/libp2p/go-libp2p-yamux v0.5.1 h1:sX4WQPHMhRxJE5UZTfjEuBvlQWXB5Bo3A2JK9ZJ9EM0= -github.com/libp2p/go-libp2p-yamux v0.5.1/go.mod h1:dowuvDu8CRWmr0iqySMiSxK+W0iL5cMVO9S94Y6gkv4= +github.com/libp2p/go-libp2p-yamux v0.5.2 h1:nblcw3QVWlPFRh39sbChDYpDBfAD/I6+Lbb4gFEILuY= +github.com/libp2p/go-libp2p-yamux v0.5.2/go.mod h1:e+aG0ZjvUbj8MEHIWV1x1IakYAXD+qQHCnYBam5uzP0= github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q= github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M= github.com/libp2p/go-maddr-filter v0.1.0 h1:4ACqZKw8AqiuJfwFGq1CYDFugfXTOos+qQ3DETkhtCE= @@ -410,8 +410,8 @@ github.com/libp2p/go-yamux v1.3.7/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/h github.com/libp2p/go-yamux v1.4.0/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI= github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= -github.com/libp2p/go-yamux/v2 v2.0.0 h1:vSGhAy5u6iHBq11ZDcyHH4Blcf9xlBhT4WQDoOE90LU= -github.com/libp2p/go-yamux/v2 v2.0.0/go.mod h1:NVWira5+sVUIU6tu1JWvaRn1dRnG+cawOJiflsAM+7U= +github.com/libp2p/go-yamux/v2 v2.1.0 h1:s+sg3egTuOnaHwmUJDLF0Msim7+ffQYdv3ohb+Vdvgo= +github.com/libp2p/go-yamux/v2 v2.1.0/go.mod h1:NVWira5+sVUIU6tu1JWvaRn1dRnG+cawOJiflsAM+7U= github.com/lucas-clemente/quic-go v0.19.3 h1:eCDQqvGBB+kCTkA0XrAFtNe81FMa0/fn4QSoeAbmiF4= github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= From b7e02820377f706926f92963ae10d3dbddc6685a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 4 May 2021 22:06:59 -0700 Subject: [PATCH 2/2] test: rewrite backpressure test We just want to make sure we don't keep writing forever if the other side isn't reading. --- p2p/test/backpressure/backpressure_test.go | 369 ++------------------- 1 file changed, 19 insertions(+), 350 deletions(-) diff --git a/p2p/test/backpressure/backpressure_test.go b/p2p/test/backpressure/backpressure_test.go index 99a7d2e271..218f3ff15c 100644 --- a/p2p/test/backpressure/backpressure_test.go +++ b/p2p/test/backpressure/backpressure_test.go @@ -2,289 +2,35 @@ package backpressure_tests import ( "context" - "io" - "math/rand" + "os" "testing" "time" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/stretchr/testify/require" - u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" protocol "github.com/libp2p/go-libp2p-core/protocol" swarmt "github.com/libp2p/go-libp2p-swarm/testing" ) var log = logging.Logger("backpressure") -// TestBackpressureStreamHandler tests whether mux handler -// ratelimiting works. Meaning, since the handler is sequential -// it should block senders. -// -// Important note: spdystream (which peerstream uses) has a set -// of n workers (n=spdsystream.FRAME_WORKERS) which handle new -// frames, including those starting new streams. So all of them -// can be in the handler at one time. Also, the sending side -// does not rate limit unless we call stream.Wait() -// -// -// Note: right now, this happens muxer-wide. the muxer should -// learn to flow control, so handlers cant block each other. -func TestBackpressureStreamHandler(t *testing.T) { - t.Skip(`Sadly, as cool as this test is, it doesn't work -Because spdystream doesnt handle stream open backpressure -well IMO. I'll see about rewriting that part when it becomes -a problem. -`) - - // a number of concurrent request handlers - limit := 10 - - // our way to signal that we're done with 1 request - requestHandled := make(chan struct{}) - - // handler rate limiting - receiverRatelimit := make(chan struct{}, limit) - for i := 0; i < limit; i++ { - receiverRatelimit <- struct{}{} - } - - // sender counter of successfully opened streams - senderOpened := make(chan struct{}, limit*100) - - // sender signals it's done (errored out) - senderDone := make(chan struct{}) - - // the receiver handles requests with some rate limiting - receiver := func(s network.Stream) { - log.Debug("receiver received a stream") - - <-receiverRatelimit // acquire - go func() { - // our request handler. can do stuff here. we - // simulate something taking time by waiting - // on requestHandled - log.Debug("request worker handling...") - <-requestHandled - log.Debug("request worker done!") - receiverRatelimit <- struct{}{} // release - }() - } - - // the sender opens streams as fast as possible - sender := func(host host.Host, remote peer.ID) { - var s network.Stream - var err error - defer func() { - t.Error(err) - log.Debug("sender error. exiting.") - senderDone <- struct{}{} - }() - - for { - s, err = host.NewStream(context.Background(), remote, protocol.TestingID) - if err != nil { - return - } - - _ = s - // if err = s.SwarmStream().Stream().Wait(); err != nil { - // return - // } - - // "count" another successfully opened stream - // (large buffer so shouldn't block in normal operation) - log.Debug("sender opened another stream!") - senderOpened <- struct{}{} - } - } - - // count our senderOpened events - countStreamsOpenedBySender := func(min int) int { - opened := 0 - for opened < min { - log.Debugf("countStreamsOpenedBySender got %d (min %d)", opened, min) - select { - case <-senderOpened: - opened++ - case <-time.After(10 * time.Millisecond): - } - } - return opened - } - - // count our received events - // waitForNReceivedStreams := func(n int) { - // for n > 0 { - // log.Debugf("waiting for %d received streams...", n) - // select { - // case <-receiverRatelimit: - // n-- - // } - // } - // } - - testStreamsOpened := func(expected int) { - log.Debugf("testing rate limited to %d streams", expected) - if n := countStreamsOpenedBySender(expected); n != expected { - t.Fatalf("rate limiting did not work :( -- %d != %d", expected, n) - } - } - - // ok that's enough setup. let's do it! - - ctx := context.Background() - h1 := bhost.New(swarmt.GenSwarm(t, ctx)) - h2 := bhost.New(swarmt.GenSwarm(t, ctx)) - - // setup receiver handler - h1.SetStreamHandler(protocol.TestingID, receiver) - - h2pi := h2.Peerstore().PeerInfo(h2.ID()) - log.Debugf("dialing %s", h2pi.Addrs) - if err := h1.Connect(ctx, h2pi); err != nil { - t.Fatal("Failed to connect:", err) - } - - // launch sender! - go sender(h2, h1.ID()) - - // ok, what do we expect to happen? the receiver should - // receive 10 requests and stop receiving, blocking the sender. - // we can test this by counting 10x senderOpened requests - - <-senderOpened // wait for the sender to successfully open some. - testStreamsOpened(limit - 1) - - // let's "handle" 3 requests. - <-requestHandled - <-requestHandled - <-requestHandled - // the sender should've now been able to open exactly 3 more. - - testStreamsOpened(3) - - // shouldn't have opened anything more - testStreamsOpened(0) - - // let's "handle" 100 requests in batches of 5 - for i := 0; i < 20; i++ { - <-requestHandled - <-requestHandled - <-requestHandled - <-requestHandled - <-requestHandled - testStreamsOpened(5) - } - - // success! - - // now for the sugar on top: let's tear down the receiver. it should - // exit the sender. - h1.Close() - - // shouldn't have opened anything more - testStreamsOpened(0) - - select { - case <-time.After(100 * time.Millisecond): - t.Error("receiver shutdown failed to exit sender") - case <-senderDone: - log.Info("handler backpressure works!") - } -} - // TestStBackpressureStreamWrite tests whether streams see proper // backpressure when writing data over the network streams. func TestStBackpressureStreamWrite(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // senderWrote signals that the sender wrote bytes to remote. - // the value is the count of bytes written. - senderWrote := make(chan int, 10000) - - // sender signals it's done (errored out) - senderDone := make(chan struct{}) - - // writeStats lets us listen to all the writes and return - // how many happened and how much was written - writeStats := func() (int, int) { - t.Helper() - writes := 0 - bytes := 0 - for { - select { - case n := <-senderWrote: - writes++ - bytes += n - default: - log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes) - return bytes, writes - } - } - } - - // sender attempts to write as fast as possible, signaling on the - // completion of every write. This makes it possible to see how - // fast it's actually writing. We pair this with a receiver - // that waits for a signal to read. - sender := func(s network.Stream) { - defer func() { - s.Close() - senderDone <- struct{}{} - }() - - // ready a buffer of random data - buf := make([]byte, 65536) - u.NewTimeSeededRand().Read(buf) - - for { - // send a randomly sized subchunk - from := rand.Intn(len(buf) / 2) - to := rand.Intn(len(buf) / 2) - sendbuf := buf[from : from+to] - - n, err := s.Write(sendbuf) - if err != nil { - log.Debug("sender error. exiting:", err) - return - } - - log.Debugf("sender wrote %d bytes", n) - select { - case senderWrote <- n: - default: - t.Error("sender wrote channel full") - } - } - } - - // receive a number of bytes from a stream. - // returns the number of bytes written. - receive := func(s network.Stream, expect int) { - t.Helper() - log.Debugf("receiver to read %d bytes", expect) - rbuf := make([]byte, expect) - n, err := io.ReadFull(s, rbuf) - if err != nil { - t.Error("read failed:", err) - } - if expect != n { - t.Errorf("read len differs: %d != %d", expect, n) - } - } - - // ok let's do it! - - // setup the networks - ctx := context.Background() h1 := bhost.New(swarmt.GenSwarm(t, ctx)) h2 := bhost.New(swarmt.GenSwarm(t, ctx)) // setup sender handler on 1 - h1.SetStreamHandler(protocol.TestingID, sender) + h1.SetStreamHandler(protocol.TestingID, func(s network.Stream) { + defer s.Reset() + <-ctx.Done() + }) h2pi := h2.Peerstore().PeerInfo(h2.ID()) log.Debugf("dialing %s", h2pi.Addrs) @@ -293,98 +39,21 @@ func TestStBackpressureStreamWrite(t *testing.T) { } // open a stream, from 2->1, this is our reader - s, err := h2.NewStream(context.Background(), h1.ID(), protocol.TestingID) + s, err := h2.NewStream(ctx, h1.ID(), protocol.TestingID) if err != nil { t.Fatal(err) } + defer s.Reset() - // let's make sure r/w works. - testSenderWrote := func(bytesE int) { - t.Helper() - bytesA, writesA := writeStats() - if bytesA != bytesE { - t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA) - } - } - - // trigger lazy connection handshaking - _, err = s.Read(nil) - if err != nil { - t.Fatal(err) - } - - // 500ms rounds of lockstep write + drain - roundsStart := time.Now() - roundsTotal := 0 - for roundsTotal < (2 << 20) { - // let the sender fill its buffers, it will stop sending. - <-time.After(time.Second) - b, _ := writeStats() - testSenderWrote(0) - <-time.After(100 * time.Millisecond) - testSenderWrote(0) - - // drain it all, wait again - receive(s, b) - roundsTotal += b - } - roundsTime := time.Since(roundsStart) - - // now read continuously, while we measure stats. - stop := make(chan struct{}) - contStart := time.Now() - - go func() { - for { - select { - case <-stop: - return - default: - receive(s, 2<<15) - } + // If nobody is reading, we should eventually timeout. + require.NoError(t, s.SetWriteDeadline(time.Now().Add(100*time.Millisecond))) + data := make([]byte, 16*1024) + for i := 0; i < 5*1024; i++ { // write at most 100MiB + _, err := s.Write(data) + if err != nil { + require.True(t, os.IsTimeout(err), err) + return } - }() - - contTotal := 0 - for contTotal < (2 << 20) { - n := <-senderWrote - contTotal += n - } - stop <- struct{}{} - contTime := time.Since(contStart) - - // now compare! continuous should've been faster AND larger - if roundsTime < contTime { - t.Error("continuous should have been faster") - } - - if roundsTotal < contTotal { - t.Error("continuous should have been larger, too!") } - - // and a couple rounds more for good measure ;) - for i := 0; i < 3; i++ { - // let the sender fill its buffers, it will stop sending. - <-time.After(time.Second) - b, _ := writeStats() - testSenderWrote(0) - <-time.After(100 * time.Millisecond) - testSenderWrote(0) - - // drain it all, wait again - receive(s, b) - } - - // this doesn't work :(: - // // now for the sugar on top: let's tear down the receiver. it should - // // exit the sender. - // n1.Close() - // testSenderWrote(0) - // testSenderWrote(0) - // select { - // case <-time.After(2 * time.Second): - // t.Error("receiver shutdown failed to exit sender") - // case <-senderDone: - // log.Info("handler backpressure works!") - // } + t.Fatal("should have timed out") }