Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connecteventmanager): block Connected() until accepted #435

Merged
merged 4 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes:

- Removed mentions of unused ARC algorithm ([#336](https://github.com/ipfs/boxo/issues/366#issuecomment-1597253540))
- Handle `_redirects` file when `If-None-Match` header is present ([#412](https://github.com/ipfs/boxo/pull/412))
- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))

### Security

Expand Down
102 changes: 67 additions & 35 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
changeQueue []change
stop bool
done chan struct{}
}

type change struct {
pid peer.ID
handled chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

type waitFn func()

func waitNoop() {}

Check warning on line 45 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L45

Added line #L45 was not covered by tests

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
Expand Down Expand Up @@ -66,7 +75,16 @@
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn {
return func() {
select {
case <-handled:
case <-c.done:

Check warning on line 82 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L82

Added line #L82 was not covered by tests
}
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -75,9 +93,20 @@
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
change := change{p, make(chan struct{})}
c.changeQueue = append(c.changeQueue, change)
c.cond.Broadcast()
return c.makeWaitFunc(change.handled)
} else if state.pending {
// Find the change in the queue and return a wait function for it
for _, change := range c.changeQueue {
if change.pid == p {
return c.makeWaitFunc(change.handled)
}

Check warning on line 105 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L101-L105

Added lines #L101 - L105 were not covered by tests
}
log.Error("a peer was marked as change pending but not found in the change queue")

Check warning on line 107 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L107

Added line #L107 was not covered by tests
}
return waitNoop

Check warning on line 109 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L109

Added line #L109 was not covered by tests
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
Expand All @@ -95,67 +124,70 @@
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
pch := c.changeQueue[0]
c.changeQueue[0] = change{} // free the resources (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
state, ok := c.peers[pch.pid]
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
close(pch.handled)

Check warning on line 138 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L138

Added line #L138 was not covered by tests
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
// Is there anything to do?
if state.curState != state.newState {
// Record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pch.pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pch.pid)
}
c.lk.Lock()

Check warning on line 160 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L149-L160

Added lines #L149 - L160 were not covered by tests
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pid)
v.PeerConnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false
// Signal that we've handled the state change
close(pch.handled)
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()

Check warning on line 185 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L185

Added line #L185 was not covered by tests
return
}
c.setState(p, stateResponsive)
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
wait()
}

// Called when we drop the final connection to a peer.
Expand Down
54 changes: 53 additions & 1 deletion bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type mockConnEvent struct {

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
events []mockConnEvent
peerConnectedCb func(p peer.ID)
}

func newMockConnListener() *mockConnListener {
Expand All @@ -29,6 +30,9 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
if cl.peerConnectedCb != nil {
cl.peerConnectedCb(p)
}
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
Expand Down Expand Up @@ -173,3 +177,51 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) {
connListener := newMockConnListener()
actionsCh := make(chan string)
connListener.peerConnectedCb = func(p peer.ID) {
actionsCh <- "PeerConnected:" + p.String()
time.Sleep(time.Millisecond * 50)
}

peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

go func() {
actionsCh <- "Connected:" + peers[0].String()
cem.Connected(peers[0])
actionsCh <- "Done:" + peers[0].String()
actionsCh <- "Connected:" + peers[1].String()
cem.Connected(peers[1])
actionsCh <- "Done:" + peers[1].String()
close(actionsCh)
}()

// We expect Done to be sent _after_ PeerConnected, which demonstrates the
// call to Connected() blocks until PeerConnected() returns.
gotActions := make([]string, 0, 3)
for event := range actionsCh {
gotActions = append(gotActions, event)
}
expectedActions := []string{
"Connected:" + peers[0].String(),
"PeerConnected:" + peers[0].String(),
"Done:" + peers[0].String(),
"Connected:" + peers[1].String(),
"PeerConnected:" + peers[1].String(),
"Done:" + peers[1].String(),
}
require.Equal(t, expectedActions, gotActions)

// Flush the event queue.
wait(t, cem)
expectedEvents := []mockConnEvent{
{peer: peers[0], connected: true},
{peer: peers[1], connected: true},
}
require.Equal(t, expectedEvents, connListener.events)
}