From 9bcb1d53a6b0b474c1aab9373d323d210b60e6b0 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 8 Mar 2018 10:46:19 -0800 Subject: [PATCH 1/2] more correctly handle close and errors 1. Only propagate close on successful EOF. 2. Reset *both* streams on error. --- .../internal/circuitv1-deprecated/relay.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index edeafc9a84..91ba7f23c6 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -293,19 +293,29 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { // error, not an EOF. go func() { count, err := io.Copy(s, bs) - if err != io.EOF && err != nil { + if err != nil { log.Debugf("relay copy error: %s", err) + // Reset both. + s.Reset() + bs.Reset() + } else { + // propagate the close + s.Close() } - s.Close() log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty()) }() go func() { count, err := io.Copy(bs, s) - if err != io.EOF && err != nil { + if err != nil { log.Debugf("relay copy error: %s", err) + // Reset both. + bs.Reset() + s.Reset() + } else { + // propagate the close + bs.Close() } - bs.Close() log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) }() } From b64da1fd89c2a263ec22c1613d9a4c195cf41e32 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 8 Mar 2018 10:51:35 -0800 Subject: [PATCH 2/2] test resetting relay connections --- .../circuitv1-deprecated/relay_test.go | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go index 05c1154015..0b655e2ad3 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay_test.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay_test.go @@ -111,6 +111,67 @@ func TestBasicRelay(t *testing.T) { } } +func TestRelayReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 3) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + time.Sleep(10 * time.Millisecond) + + r1, err := NewRelay(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } + + _, err = NewRelay(ctx, hosts[1], OptHop) + if err != nil { + t.Fatal(err) + } + + r3, err := NewRelay(ctx, hosts[2]) + if err != nil { + t.Fatal(err) + } + + msg := []byte("relay works!") + go func() { + list := r3.Listener() + + con, err := list.Accept() + if err != nil { + t.Error(err) + return + } + + _, err = con.Write(msg) + if err != nil { + t.Error(err) + return + } + hosts[2].Close() + }() + + rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID()) + dinfo := hosts[2].Peerstore().PeerInfo(hosts[2].ID()) + + rctx, rcancel := context.WithTimeout(ctx, time.Second) + defer rcancel() + + con, err := r1.DialPeer(rctx, rinfo, dinfo) + if err != nil { + t.Fatal(err) + } + + _, err = ioutil.ReadAll(con) + if err == nil { + t.Fatal("expected error for reset relayed connection") + } +} + func TestBasicRelayDial(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()