From 7f075b1c578e0f4a8f4323c9362dd79373bc9101 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 21 Aug 2023 16:10:35 +0200 Subject: [PATCH] Revert "feat(connecteventmanager): block Connected() until accepted (#435)" and tests This reverts commit 7ec68c5e5adfb13e52f93c20e7c5aadf4860a871. This reverts commit 59a2bca3f0924d5a9cc61bbb6c33dbd483f4a0f4. This reverts commit 1d2f5e511e9fb0e1ff4c2b93aea421fc2091e3f5. --- CHANGELOG.md | 1 - bitswap/network/connecteventmanager.go | 102 +++++++------------- bitswap/network/connecteventmanager_test.go | 76 ++++----------- 3 files changed, 52 insertions(+), 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c47ac1f10..28cafa33c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,6 @@ The following emojis are used to highlight certain changes: ### Fixed -- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435)) - HTTP Gateway API: Not having a block will result in a 5xx error rather than 404 - HTTP Gateway API: CAR requests will return 200s and a CAR file proving a requested path does not exist rather than returning an error diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index e38598812..88337fce3 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -25,25 +25,16 @@ type connectEventManager struct { cond sync.Cond peers map[peer.ID]*peerState - changeQueue []change + changeQueue []peer.ID stop bool done chan struct{} } -type change struct { - pid peer.ID - handled chan struct{} -} - type peerState struct { newState, curState state pending bool } -type waitFn func() - -func waitNoop() {} - func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { evtManager := &connectEventManager{ connListeners: connListeners, @@ -75,16 +66,7 @@ func (c *connectEventManager) getState(p peer.ID) state { } } -func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn { - return func() { - select { - case <-handled: - case <-c.done: - } - } -} - -func (c *connectEventManager) setState(p peer.ID, newState state) waitFn { +func (c *connectEventManager) setState(p peer.ID, newState state) { state, ok := c.peers[p] if !ok { state = new(peerState) @@ -93,20 +75,9 @@ func (c *connectEventManager) setState(p peer.ID, newState state) waitFn { state.newState = newState if !state.pending && state.newState != state.curState { state.pending = true - change := change{p, make(chan struct{})} - c.changeQueue = append(c.changeQueue, change) + c.changeQueue = append(c.changeQueue, p) c.cond.Broadcast() - return c.makeWaitFunc(change.handled) - } else if state.pending { - // Find the change in the queue and return a wait function for it - for _, change := range c.changeQueue { - if change.pid == p { - return c.makeWaitFunc(change.handled) - } - } - log.Error("a peer was marked as change pending but not found in the change queue") } - return waitNoop } // Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the @@ -124,70 +95,67 @@ func (c *connectEventManager) worker() { defer close(c.done) for c.waitChange() { - pch := c.changeQueue[0] - c.changeQueue[0] = change{} // free the resources (slicing won't do that) + pid := c.changeQueue[0] + c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that) c.changeQueue = c.changeQueue[1:] - state, ok := c.peers[pch.pid] + state, ok := c.peers[pid] // If we've disconnected and forgotten, continue. if !ok { // This shouldn't be possible because _this_ thread is responsible for // removing peers from this map, and we shouldn't get duplicate entries in // the change queue. log.Error("a change was enqueued for a peer we're not tracking") - close(pch.handled) continue } - // Is there anything to do? - if state.curState != state.newState { - // Record the state update, then apply it. - oldState := state.curState - state.curState = state.newState - - switch state.newState { - case stateDisconnected: - delete(c.peers, pch.pid) - fallthrough - case stateUnresponsive: - // Only trigger a disconnect event if the peer was responsive. - // We could be transitioning from unresponsive to disconnected. - if oldState == stateResponsive { - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerDisconnected(pch.pid) - } - c.lk.Lock() - } - case stateResponsive: + // Record the fact that this "state" is no longer in the queue. + state.pending = false + + // Then, if there's nothing to do, continue. + if state.curState == state.newState { + continue + } + + // Or record the state update, then apply it. + oldState := state.curState + state.curState = state.newState + + switch state.newState { + case stateDisconnected: + delete(c.peers, pid) + fallthrough + case stateUnresponsive: + // Only trigger a disconnect event if the peer was responsive. + // We could be transitioning from unresponsive to disconnected. + if oldState == stateResponsive { c.lk.Unlock() for _, v := range c.connListeners { - v.PeerConnected(pch.pid) + v.PeerDisconnected(pid) } c.lk.Lock() } + case stateResponsive: + c.lk.Unlock() + for _, v := range c.connListeners { + v.PeerConnected(pid) + } + c.lk.Lock() } - - // Record the fact that this "state" is no longer in the queue. - state.pending = false - // Signal that we've handled the state change - close(pch.handled) } } // Called whenever we receive a new connection. May be called many times. func (c *connectEventManager) Connected(p peer.ID) { c.lk.Lock() + defer c.lk.Unlock() // !responsive -> responsive if c.getState(p) == stateResponsive { - c.lk.Unlock() return } - wait := c.setState(p, stateResponsive) - c.lk.Unlock() - wait() + c.setState(p, stateResponsive) } // Called when we drop the final connection to a peer. diff --git a/bitswap/network/connecteventmanager_test.go b/bitswap/network/connecteventmanager_test.go index c961a5a97..e3904ee55 100644 --- a/bitswap/network/connecteventmanager_test.go +++ b/bitswap/network/connecteventmanager_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ipfs/boxo/bitswap/internal/testutil" + "github.com/ipfs/boxo/internal/test" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" ) @@ -17,8 +18,7 @@ type mockConnEvent struct { type mockConnListener struct { sync.Mutex - events []mockConnEvent - peerConnectedCb func(p peer.ID) + events []mockConnEvent } func newMockConnListener() *mockConnListener { @@ -29,9 +29,6 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) { cl.Lock() defer cl.Unlock() cl.events = append(cl.events, mockConnEvent{connected: true, peer: p}) - if cl.peerConnectedCb != nil { - cl.peerConnectedCb(p) - } } func (cl *mockConnListener) PeerDisconnected(p peer.ID) { @@ -49,6 +46,8 @@ func wait(t *testing.T, c *connectEventManager) { } func TestConnectEventManagerConnectDisconnect(t *testing.T) { + test.Flaky(t) + connListener := newMockConnListener() peers := testutil.GeneratePeers(2) cem := newConnectEventManager(connListener) @@ -65,26 +64,31 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) { connected: true, }) + // Flush the event queue. + wait(t, cem) require.Equal(t, expectedEvents, connListener.events) + // Block up the event loop. + connListener.Lock() cem.Connected(peers[1]) expectedEvents = append(expectedEvents, mockConnEvent{ peer: peers[1], connected: true, }) - require.Equal(t, expectedEvents, connListener.events) + // We don't expect this to show up. cem.Disconnected(peers[0]) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: peers[0], - connected: false, - }) - // Flush the event queue. + cem.Connected(peers[0]) + + connListener.Unlock() + wait(t, cem) require.Equal(t, expectedEvents, connListener.events) } func TestConnectEventManagerMarkUnresponsive(t *testing.T) { + test.Flaky(t) + connListener := newMockConnListener() p := testutil.GeneratePeers(1)[0] cem := newConnectEventManager(connListener) @@ -134,6 +138,8 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) { } func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { + test.Flaky(t) + connListener := newMockConnListener() p := testutil.GeneratePeers(1)[0] cem := newConnectEventManager(connListener) @@ -167,51 +173,3 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { require.Empty(t, cem.peers) // all disconnected require.Equal(t, expectedEvents, connListener.events) } - -func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) { - connListener := newMockConnListener() - actionsCh := make(chan string) - connListener.peerConnectedCb = func(p peer.ID) { - actionsCh <- "PeerConnected:" + p.String() - time.Sleep(time.Millisecond * 50) - } - - peers := testutil.GeneratePeers(2) - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - go func() { - actionsCh <- "Connected:" + peers[0].String() - cem.Connected(peers[0]) - actionsCh <- "Done:" + peers[0].String() - actionsCh <- "Connected:" + peers[1].String() - cem.Connected(peers[1]) - actionsCh <- "Done:" + peers[1].String() - close(actionsCh) - }() - - // We expect Done to be sent _after_ PeerConnected, which demonstrates the - // call to Connected() blocks until PeerConnected() returns. - gotActions := make([]string, 0, 3) - for event := range actionsCh { - gotActions = append(gotActions, event) - } - expectedActions := []string{ - "Connected:" + peers[0].String(), - "PeerConnected:" + peers[0].String(), - "Done:" + peers[0].String(), - "Connected:" + peers[1].String(), - "PeerConnected:" + peers[1].String(), - "Done:" + peers[1].String(), - } - require.Equal(t, expectedActions, gotActions) - - // Flush the event queue. - wait(t, cem) - expectedEvents := []mockConnEvent{ - {peer: peers[0], connected: true}, - {peer: peers[1], connected: true}, - } - require.Equal(t, expectedEvents, connListener.events) -}