From 76f5ee2419f15a9421c907556e948561cee57011 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 16 Aug 2023 09:20:28 +1000 Subject: [PATCH 1/4] feat(connecteventmanager): block Connected() until accepted Ref: https://github.com/ipfs/boxo/issues/432 Minimal attempt at solving #432 --- CHANGELOG.md | 1 + bitswap/network/connecteventmanager.go | 78 ++++++++++++++++++++------ 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c8369a13..fcbd2769b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes: - Removed mentions of unused ARC algorithm ([#336](https://github.com/ipfs/boxo/issues/366#issuecomment-1597253540)) - Handle `_redirects` file when `If-None-Match` header is present ([#412](https://github.com/ipfs/boxo/pull/412)) +- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435)) ### Security diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index 88337fce3..0704099bd 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -32,7 +32,38 @@ type connectEventManager struct { type peerState struct { newState, curState state - pending bool + accepted chan struct{} +} + +func newPeerState() *peerState { + return &peerState{accepted: make(chan struct{})} +} + +func (p *peerState) isPending() bool { + select { + case <-p.accepted: + return false + default: + } + return true +} + +func (p *peerState) accept() { + select { + case <-p.accepted: + default: + close(p.accepted) + } +} + +func (p *peerState) setPending() { + if !p.isPending() { + p.accepted = make(chan struct{}) + } +} + +func (p *peerState) waitAccept() { + <-p.accepted } func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { @@ -61,22 +92,33 @@ func (c *connectEventManager) Stop() { func (c *connectEventManager) getState(p peer.ID) state { if state, ok := c.peers[p]; ok { return state.newState - } else { - return stateDisconnected } + return stateDisconnected } -func (c *connectEventManager) setState(p peer.ID, newState state) { - state, ok := c.peers[p] - if !ok { - state = new(peerState) +func (c *connectEventManager) setState(p peer.ID, newState state) bool { + state, isExisting := c.peers[p] + if !isExisting { + state = newPeerState() c.peers[p] = state } state.newState = newState - if !state.pending && state.newState != state.curState { - state.pending = true - c.changeQueue = append(c.changeQueue, p) - c.cond.Broadcast() + if state.newState != state.curState { + if !isExisting || !state.isPending() { + if isExisting { + state.setPending() + } + c.changeQueue = append(c.changeQueue, p) + c.cond.Broadcast() + } + return true + } + return false +} + +func (c *connectEventManager) waitStateAccept(p peer.ID) { + if state, ok := c.peers[p]; ok { + state.waitAccept() } } @@ -109,11 +151,9 @@ func (c *connectEventManager) worker() { continue } - // Record the fact that this "state" is no longer in the queue. - state.pending = false - // Then, if there's nothing to do, continue. if state.curState == state.newState { + state.accept() continue } @@ -124,6 +164,7 @@ func (c *connectEventManager) worker() { switch state.newState { case stateDisconnected: delete(c.peers, pid) + state.accept() fallthrough case stateUnresponsive: // Only trigger a disconnect event if the peer was responsive. @@ -142,20 +183,25 @@ func (c *connectEventManager) worker() { } c.lk.Lock() } + state.accept() } } // Called whenever we receive a new connection. May be called many times. func (c *connectEventManager) Connected(p peer.ID) { c.lk.Lock() - defer c.lk.Unlock() // !responsive -> responsive if c.getState(p) == stateResponsive { + c.lk.Unlock() return } - c.setState(p, stateResponsive) + wait := c.setState(p, stateResponsive) + c.lk.Unlock() + if wait { + c.waitStateAccept(p) + } } // Called when we drop the final connection to a peer. From 320ea03416f143df9fdc21799396ae6f1742725f Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 16 Aug 2023 14:56:37 +1000 Subject: [PATCH 2/4] fix(connecteventmanager): less complex channel signalling --- bitswap/network/connecteventmanager.go | 140 ++++++++++--------------- 1 file changed, 56 insertions(+), 84 deletions(-) diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index 0704099bd..be86778c4 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -25,46 +25,24 @@ type connectEventManager struct { cond sync.Cond peers map[peer.ID]*peerState - changeQueue []peer.ID + changeQueue []change stop bool done chan struct{} } -type peerState struct { - newState, curState state - accepted chan struct{} -} - -func newPeerState() *peerState { - return &peerState{accepted: make(chan struct{})} -} - -func (p *peerState) isPending() bool { - select { - case <-p.accepted: - return false - default: - } - return true +type change struct { + pid peer.ID + handled chan struct{} } -func (p *peerState) accept() { - select { - case <-p.accepted: - default: - close(p.accepted) - } +type peerState struct { + newState, curState state + pending bool } -func (p *peerState) setPending() { - if !p.isPending() { - p.accepted = make(chan struct{}) - } -} +type waitFn func() -func (p *peerState) waitAccept() { - <-p.accepted -} +func waitNoop() {} func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { evtManager := &connectEventManager{ @@ -92,34 +70,29 @@ func (c *connectEventManager) Stop() { func (c *connectEventManager) getState(p peer.ID) state { if state, ok := c.peers[p]; ok { return state.newState + } else { + return stateDisconnected } - return stateDisconnected } -func (c *connectEventManager) setState(p peer.ID, newState state) bool { - state, isExisting := c.peers[p] - if !isExisting { - state = newPeerState() +func (c *connectEventManager) setState(p peer.ID, newState state) waitFn { + state, ok := c.peers[p] + if !ok { + state = new(peerState) c.peers[p] = state } state.newState = newState - if state.newState != state.curState { - if !isExisting || !state.isPending() { - if isExisting { - state.setPending() - } - c.changeQueue = append(c.changeQueue, p) - c.cond.Broadcast() + if !state.pending && state.newState != state.curState { + state.pending = true + change := change{p, make(chan struct{})} + c.changeQueue = append(c.changeQueue, change) + c.cond.Broadcast() + return func() { + // Wait until the change has been handled + <-change.handled } - return true - } - return false -} - -func (c *connectEventManager) waitStateAccept(p peer.ID) { - if state, ok := c.peers[p]; ok { - state.waitAccept() } + return waitNoop } // Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the @@ -137,53 +110,54 @@ func (c *connectEventManager) worker() { defer close(c.done) for c.waitChange() { - pid := c.changeQueue[0] - c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that) + pch := c.changeQueue[0] + c.changeQueue[0] = change{} // free the resources (slicing won't do that) c.changeQueue = c.changeQueue[1:] - state, ok := c.peers[pid] + state, ok := c.peers[pch.pid] // If we've disconnected and forgotten, continue. if !ok { // This shouldn't be possible because _this_ thread is responsible for // removing peers from this map, and we shouldn't get duplicate entries in // the change queue. log.Error("a change was enqueued for a peer we're not tracking") + close(pch.handled) continue } - // Then, if there's nothing to do, continue. - if state.curState == state.newState { - state.accept() - continue - } - - // Or record the state update, then apply it. - oldState := state.curState - state.curState = state.newState - - switch state.newState { - case stateDisconnected: - delete(c.peers, pid) - state.accept() - fallthrough - case stateUnresponsive: - // Only trigger a disconnect event if the peer was responsive. - // We could be transitioning from unresponsive to disconnected. - if oldState == stateResponsive { + // Is there anything to do? + if state.curState != state.newState { + // Record the state update, then apply it. + oldState := state.curState + state.curState = state.newState + + switch state.newState { + case stateDisconnected: + delete(c.peers, pch.pid) + fallthrough + case stateUnresponsive: + // Only trigger a disconnect event if the peer was responsive. + // We could be transitioning from unresponsive to disconnected. + if oldState == stateResponsive { + c.lk.Unlock() + for _, v := range c.connListeners { + v.PeerDisconnected(pch.pid) + } + c.lk.Lock() + } + case stateResponsive: c.lk.Unlock() for _, v := range c.connListeners { - v.PeerDisconnected(pid) + v.PeerConnected(pch.pid) } c.lk.Lock() } - case stateResponsive: - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerConnected(pid) - } - c.lk.Lock() } - state.accept() + + // Record the fact that this "state" is no longer in the queue. + state.pending = false + // Signal that we've handled the state change + close(pch.handled) } } @@ -199,9 +173,7 @@ func (c *connectEventManager) Connected(p peer.ID) { } wait := c.setState(p, stateResponsive) c.lk.Unlock() - if wait { - c.waitStateAccept(p) - } + wait() } // Called when we drop the final connection to a peer. From 9db1e787cb2a55688947d995485718b5bb129896 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 16 Aug 2023 17:28:17 +1000 Subject: [PATCH 3/4] fix(connecteventmanager): handle change queue edge cases and closure --- bitswap/network/connecteventmanager.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index be86778c4..e38598812 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -75,6 +75,15 @@ func (c *connectEventManager) getState(p peer.ID) state { } } +func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn { + return func() { + select { + case <-handled: + case <-c.done: + } + } +} + func (c *connectEventManager) setState(p peer.ID, newState state) waitFn { state, ok := c.peers[p] if !ok { @@ -87,10 +96,15 @@ func (c *connectEventManager) setState(p peer.ID, newState state) waitFn { change := change{p, make(chan struct{})} c.changeQueue = append(c.changeQueue, change) c.cond.Broadcast() - return func() { - // Wait until the change has been handled - <-change.handled + return c.makeWaitFunc(change.handled) + } else if state.pending { + // Find the change in the queue and return a wait function for it + for _, change := range c.changeQueue { + if change.pid == p { + return c.makeWaitFunc(change.handled) + } } + log.Error("a peer was marked as change pending but not found in the change queue") } return waitNoop } From 22fd90dd6f32095c233bde7a658280cc324bc8f6 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 17 Aug 2023 10:00:39 +1000 Subject: [PATCH 4/4] fix(connecteventmanager): add test to confirm sync Connected() call flow --- bitswap/network/connecteventmanager_test.go | 54 ++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/bitswap/network/connecteventmanager_test.go b/bitswap/network/connecteventmanager_test.go index e3904ee55..5ce4e690c 100644 --- a/bitswap/network/connecteventmanager_test.go +++ b/bitswap/network/connecteventmanager_test.go @@ -18,7 +18,8 @@ type mockConnEvent struct { type mockConnListener struct { sync.Mutex - events []mockConnEvent + events []mockConnEvent + peerConnectedCb func(p peer.ID) } func newMockConnListener() *mockConnListener { @@ -29,6 +30,9 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) { cl.Lock() defer cl.Unlock() cl.events = append(cl.events, mockConnEvent{connected: true, peer: p}) + if cl.peerConnectedCb != nil { + cl.peerConnectedCb(p) + } } func (cl *mockConnListener) PeerDisconnected(p peer.ID) { @@ -173,3 +177,51 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { require.Empty(t, cem.peers) // all disconnected require.Equal(t, expectedEvents, connListener.events) } + +func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) { + connListener := newMockConnListener() + actionsCh := make(chan string) + connListener.peerConnectedCb = func(p peer.ID) { + actionsCh <- "PeerConnected:" + p.String() + time.Sleep(time.Millisecond * 50) + } + + peers := testutil.GeneratePeers(2) + cem := newConnectEventManager(connListener) + cem.Start() + t.Cleanup(cem.Stop) + + go func() { + actionsCh <- "Connected:" + peers[0].String() + cem.Connected(peers[0]) + actionsCh <- "Done:" + peers[0].String() + actionsCh <- "Connected:" + peers[1].String() + cem.Connected(peers[1]) + actionsCh <- "Done:" + peers[1].String() + close(actionsCh) + }() + + // We expect Done to be sent _after_ PeerConnected, which demonstrates the + // call to Connected() blocks until PeerConnected() returns. + gotActions := make([]string, 0, 3) + for event := range actionsCh { + gotActions = append(gotActions, event) + } + expectedActions := []string{ + "Connected:" + peers[0].String(), + "PeerConnected:" + peers[0].String(), + "Done:" + peers[0].String(), + "Connected:" + peers[1].String(), + "PeerConnected:" + peers[1].String(), + "Done:" + peers[1].String(), + } + require.Equal(t, expectedActions, gotActions) + + // Flush the event queue. + wait(t, cem) + expectedEvents := []mockConnEvent{ + {peer: peers[0], connected: true}, + {peer: peers[1], connected: true}, + } + require.Equal(t, expectedEvents, connListener.events) +}