From bf651ca7c73c68d8a0c15685475c2b71f1c0743d Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 12:50:42 +0300 Subject: [PATCH 1/9] use a single, NoDial context in identify push --- p2p/protocol/identify/id.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 9aac465bfa..acf0aa9020 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -156,19 +156,35 @@ func (ids *IDService) pushHandler(s inet.Stream) { } func (ids *IDService) Push() { + var wg sync.WaitGroup + + // we could make this context timeout-less since we are only opening a new + // stream over an existing connection. This would avoid the need for the + // supervisory goroutine below, but timeout-less contexts in network operations + // make me nervous. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx = inet.WithNoDial(ctx, "identify push") + for _, p := range ids.Host.Network().Peers() { + wg.Add(1) go func(p peer.ID) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() + defer wg.Done() + s, err := ids.Host.NewStream(ctx, p, IDPush) if err != nil { - log.Debugf("error opening push stream: %s", err.Error()) + log.Debugf("error opening push stream to %s: %s", p, err.Error()) return } ids.requestHandler(s) }(p) } + + // this supervisory goroutine is necessary to cancel the context + go func() { + wg.Wait() + cancel() + }() } func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { From 8fa257cdf01d32d89485f008cee1a7aedf5dccd0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 12:56:29 +0300 Subject: [PATCH 2/9] track context given in identify constructor and use it to gate identify push --- p2p/protocol/identify/id.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index acf0aa9020..d53c99eafe 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -47,6 +47,8 @@ const transientTTL = 10 * time.Second type IDService struct { Host host.Host + ctx context.Context + // connections undergoing identification // for wait purposes currid map[inet.Conn]chan struct{} @@ -64,6 +66,7 @@ type IDService struct { func NewIDService(ctx context.Context, h host.Host) *IDService { s := &IDService{ Host: h, + ctx: ctx, currid: make(map[inet.Conn]chan struct{}), observedAddrs: NewObservedAddrSet(ctx), } @@ -162,7 +165,7 @@ func (ids *IDService) Push() { // stream over an existing connection. This would avoid the need for the // supervisory goroutine below, but timeout-less contexts in network operations // make me nervous. - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx, cancel := context.WithTimeout(ids.ctx, 15*time.Second) ctx = inet.WithNoDial(ctx, "identify push") for _, p := range ids.Host.Network().Peers() { From 7502fc44c9a199a42029ee523786458a0d34dba1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 13:57:52 +0300 Subject: [PATCH 3/9] add a background task in basic host that periodically pushes identify if needed --- p2p/host/basic/basic_host.go | 73 +++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0725fa702d..3369554a77 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -4,6 +4,7 @@ import ( "context" "io" "net" + "sync" "time" logging "github.com/ipfs/go-log" @@ -70,6 +71,10 @@ type BasicHost struct { negtimeout time.Duration proc goprocess.Process + + bgcancel func() + mx sync.Mutex + lastAddrs []ma.Multiaddr } // HostOpts holds options that can be passed to NewHost in order to @@ -164,6 +169,11 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, net.SetConnHandler(h.newConnHandler) net.SetStreamHandler(h.newStreamHandler) + + bgctx, cancel := context.WithCancel(ctx) + h.bgcancel = cancel + go h.background(bgctx) + return h, nil } @@ -263,7 +273,67 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { // PushIdentify pushes an identify update through the identify push protocol // Warning: this interface is unstable and may disappear in the future. func (h *BasicHost) PushIdentify() { - h.ids.Push() + push := false + + h.mx.Lock() + addrs := h.Addrs() + if !sameAddrs(addrs, h.lastAddrs) { + push = true + h.lastAddrs = addrs + } + h.mx.Unlock() + + if push { + h.ids.Push() + } +} + +func (h *BasicHost) background(ctx context.Context) { + // periodically schedules an IdentifyPush to update our peers for changes + // in our address set (if needed) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + // initialize lastAddrs + h.mx.Lock() + if h.lastAddrs == nil { + h.lastAddrs = h.Addrs() + } + h.mx.Unlock() + + for { + select { + case <-ticker.C: + h.PushIdentify() + + case <-ctx.Done(): + return + } + } +} + +func sameAddrs(a, b []ma.Multiaddr) bool { + if len(a) != len(b) { + return false + } + + // this is O(n*m), might be worth using a map to turn into O(n+m) + for _, addr := range a { + if !findAddr(addr, b) { + return false + } + } + + return true +} + +func findAddr(addr ma.Multiaddr, addrs []ma.Multiaddr) bool { + for _, xaddr := range addrs { + if addr.Equal(xaddr) { + return true + } + } + return false } // ID returns the (local) peer.ID associated with this Host @@ -646,6 +716,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { // Close shuts down the Host's services (network, etc). func (h *BasicHost) Close() error { + h.bgcancel() return h.proc.Close() } From 584590975466e52a9a3eb021ab83c501b25421db Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 14:08:02 +0300 Subject: [PATCH 4/9] add initialization delay in basic host background task --- p2p/host/basic/basic_host.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 3369554a77..62aad3e280 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -289,6 +289,9 @@ func (h *BasicHost) PushIdentify() { } func (h *BasicHost) background(ctx context.Context) { + // wait a bit for the host to initialize (avoid race with libp2p constructor) + time.Sleep(1 * time.Second) + // periodically schedules an IdentifyPush to update our peers for changes // in our address set (if needed) ticker := time.NewTicker(1 * time.Minute) From c9b2f468ba794236a8adefcc7ee5fb0554a7804a Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 14:17:07 +0300 Subject: [PATCH 5/9] reduce peer count in TestFuzzManyPeers when running under the race detector --- p2p/net/mock/mock_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index fa53c16ec5..473e4424df 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -588,7 +588,7 @@ func TestLimitedStreams(t *testing.T) { func TestFuzzManyPeers(t *testing.T) { peerCount := 50000 if detectrace.WithRace() { - peerCount = 1000 + peerCount = 100 } for i := 0; i < peerCount; i++ { _, err := FullMeshConnected(context.Background(), 2) From c73f68bdb4714ca330b6c92192d823efbbcae9fd Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 13 Apr 2019 19:32:45 +0300 Subject: [PATCH 6/9] explicit Start method for basic host --- config/config.go | 3 +++ p2p/host/basic/basic_host.go | 21 ++++++++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/config/config.go b/config/config.go index 0b8c676bd4..79b04420b3 100644 --- a/config/config.go +++ b/config/config.go @@ -228,6 +228,9 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { } } + // start the host background tasks + h.Start() + if router != nil { return routed.Wrap(h, router), nil } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 62aad3e280..3aa64e948f 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -72,7 +72,8 @@ type BasicHost struct { proc goprocess.Process - bgcancel func() + ctx context.Context + cancel func() mx sync.Mutex lastAddrs []ma.Multiaddr } @@ -171,8 +172,8 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, net.SetStreamHandler(h.newStreamHandler) bgctx, cancel := context.WithCancel(ctx) - h.bgcancel = cancel - go h.background(bgctx) + h.ctx = bgctx + h.cancel = cancel return h, nil } @@ -214,6 +215,11 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { return h } +// Start starts background tasks in the host +func (h *BasicHost) Start() { + go h.background() +} + // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { // Clear protocols on connecting to new peer to avoid issues caused @@ -288,10 +294,7 @@ func (h *BasicHost) PushIdentify() { } } -func (h *BasicHost) background(ctx context.Context) { - // wait a bit for the host to initialize (avoid race with libp2p constructor) - time.Sleep(1 * time.Second) - +func (h *BasicHost) background() { // periodically schedules an IdentifyPush to update our peers for changes // in our address set (if needed) ticker := time.NewTicker(1 * time.Minute) @@ -309,7 +312,7 @@ func (h *BasicHost) background(ctx context.Context) { case <-ticker.C: h.PushIdentify() - case <-ctx.Done(): + case <-h.ctx.Done(): return } } @@ -719,7 +722,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { // Close shuts down the Host's services (network, etc). func (h *BasicHost) Close() error { - h.bgcancel() + h.cancel() return h.proc.Close() } From 3697552406a7261757f208c95718e43f687ed4d0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 19 Apr 2019 13:03:18 +0300 Subject: [PATCH 7/9] handle misbehaving peers in identify push --- p2p/protocol/identify/id.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index d53c99eafe..cfc4d08897 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -161,11 +161,7 @@ func (ids *IDService) pushHandler(s inet.Stream) { func (ids *IDService) Push() { var wg sync.WaitGroup - // we could make this context timeout-less since we are only opening a new - // stream over an existing connection. This would avoid the need for the - // supervisory goroutine below, but timeout-less contexts in network operations - // make me nervous. - ctx, cancel := context.WithTimeout(ids.ctx, 15*time.Second) + ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second) ctx = inet.WithNoDial(ctx, "identify push") for _, p := range ids.Host.Network().Peers() { @@ -179,7 +175,18 @@ func (ids *IDService) Push() { return } - ids.requestHandler(s) + rch := make(chan struct{}, 1) + go func() { + ids.requestHandler(s) + rch <- struct{}{} + }() + + select { + case <-rch: + case <-ctx.Done(): + // this is taking too long, abort! + s.Reset() + } }(p) } From ebc5d16f5dce3dbb54901f31776d0659fbf97744 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 19 Apr 2019 13:08:06 +0300 Subject: [PATCH 8/9] use a map in sameAddrs to avoid quadratic behaviour --- p2p/host/basic/basic_host.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 3aa64e948f..9fa5441fa7 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -323,9 +323,14 @@ func sameAddrs(a, b []ma.Multiaddr) bool { return false } - // this is O(n*m), might be worth using a map to turn into O(n+m) + bmap := make(map[string]struct{}) + for _, addr := range b { + bmap[string(addr.Bytes())] = struct{}{} + } + for _, addr := range a { - if !findAddr(addr, b) { + _, ok := bmap[string(addr.Bytes())] + if !ok { return false } } @@ -333,15 +338,6 @@ func sameAddrs(a, b []ma.Multiaddr) bool { return true } -func findAddr(addr ma.Multiaddr, addrs []ma.Multiaddr) bool { - for _, xaddr := range addrs { - if addr.Equal(xaddr) { - return true - } - } - return false -} - // ID returns the (local) peer.ID associated with this Host func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() From f17a4a86677c48b1cced0b0e65b8218a280307c6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 19 Apr 2019 19:39:00 +0300 Subject: [PATCH 9/9] preallocate map in sameAddrs --- p2p/host/basic/basic_host.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 9fa5441fa7..1b11bbdc80 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -323,7 +323,7 @@ func sameAddrs(a, b []ma.Multiaddr) bool { return false } - bmap := make(map[string]struct{}) + bmap := make(map[string]struct{}, len(b)) for _, addr := range b { bmap[string(addr.Bytes())] = struct{}{} }