Skip to content

Commit

Permalink
feat: move broadcast wantlist into the peermanager (ipfs#365)
Browse files Browse the repository at this point in the history
* feat: small optimizations

* feat: move broadcast wantlist into the peermanager

This deduplicates some state and allows us to do less book-keeping for broadcast
wants. We should probably rename the PeerManager to the WantManager and rename the
WantManager to something else.

* fix: lint warnings

This commit was moved from ipfs/go-bitswap@2a03373
  • Loading branch information
Stebalien authored Apr 22, 2020
1 parent f223da3 commit 5cfe98e
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 239 deletions.
5 changes: 4 additions & 1 deletion bitswap/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
mq.dhTimeoutMgr.CancelPending(cancelKs)

mq.wllock.Lock()
defer mq.wllock.Unlock()

workReady := false

Expand All @@ -282,6 +281,10 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}

mq.wllock.Unlock()

// Unlock first to be nice to the scheduler.

// Schedule a message send
if workReady {
mq.signalWorkReady()
Expand Down
8 changes: 3 additions & 5 deletions bitswap/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,16 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {

// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) {
func (pm *PeerManager) Connected(p peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

pq := pm.getOrCreate(p)

// Inform the peer want manager that there's a new peer
pm.pwm.addPeer(p)
// Record that the want-haves are being sent to the peer
_, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves)
wants := pm.pwm.addPeer(p)
// Broadcast any live want-haves to the newly connected peers
pq.AddBroadcastWantHaves(wantHaves)
pq.AddBroadcastWantHaves(wants)
// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
Expand Down
40 changes: 23 additions & 17 deletions bitswap/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestAddingAndRemovingPeers(t *testing.T) {
self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
peerManager := New(ctx, peerQueueFactory, self)

peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer3, nil)
peerManager.Connected(peer1)
peerManager.Connected(peer2)
peerManager.Connected(peer3)

connectedPeers := peerManager.ConnectedPeers()

Expand All @@ -108,7 +108,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {
}

// reconnect peer
peerManager.Connected(peer1, nil)
peerManager.Connected(peer1)
connectedPeers = peerManager.ConnectedPeers()

if !testutil.ContainsPeer(connectedPeers, peer1) {
Expand All @@ -126,9 +126,10 @@ func TestBroadcastOnConnect(t *testing.T) {
peerManager := New(ctx, peerQueueFactory, self)

cids := testutil.GenerateCids(2)
peerManager.BroadcastWantHaves(ctx, cids)

// Connect with two broadcast wants for first peer
peerManager.Connected(peer1, cids)
peerManager.Connected(peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if len(collected[peer1].wantHaves) != 2 {
Expand All @@ -147,16 +148,19 @@ func TestBroadcastWantHaves(t *testing.T) {

cids := testutil.GenerateCids(3)

// Connect to first peer with two broadcast wants
peerManager.Connected(peer1, []cid.Cid{cids[0], cids[1]})
// Broadcast the first two.
peerManager.BroadcastWantHaves(ctx, cids[:2])

// First peer should get them.
peerManager.Connected(peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if len(collected[peer1].wantHaves) != 2 {
t.Fatal("Expected want-haves to be sent to newly connected peer")
}

// Connect to second peer
peerManager.Connected(peer2, nil)
peerManager.Connected(peer2)

// Send a broadcast to all peers, including cid that was already sent to
// first peer
Expand All @@ -165,10 +169,12 @@ func TestBroadcastWantHaves(t *testing.T) {

// One of the want-haves was already sent to peer1
if len(collected[peer1].wantHaves) != 1 {
t.Fatal("Expected 1 want-haves to be sent to first peer", collected[peer1].wantHaves)
t.Fatalf("Expected 1 want-haves to be sent to first peer, got %d",
len(collected[peer1].wantHaves))
}
if len(collected[peer2].wantHaves) != 2 {
t.Fatal("Expected 2 want-haves to be sent to second peer")
if len(collected[peer2].wantHaves) != 3 {
t.Fatalf("Expected 3 want-haves to be sent to second peer, got %d",
len(collected[peer2].wantHaves))
}
}

Expand All @@ -182,7 +188,7 @@ func TestSendWants(t *testing.T) {
peerManager := New(ctx, peerQueueFactory, self)
cids := testutil.GenerateCids(4)

peerManager.Connected(peer1, nil)
peerManager.Connected(peer1)
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0]}, []cid.Cid{cids[2]})
collected := collectMessages(msgs, 2*time.Millisecond)

Expand Down Expand Up @@ -217,8 +223,8 @@ func TestSendCancels(t *testing.T) {
cids := testutil.GenerateCids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})
Expand Down Expand Up @@ -286,11 +292,11 @@ func TestSessionRegistration(t *testing.T) {
t.Fatal("Expected peer not be available till connected")
}

peerManager.Connected(p1, nil)
peerManager.Connected(p1)
if !s.available[p1] {
t.Fatal("Expected signal callback")
}
peerManager.Connected(p2, nil)
peerManager.Connected(p2)
if !s.available[p2] {
t.Fatal("Expected signal callback")
}
Expand All @@ -305,7 +311,7 @@ func TestSessionRegistration(t *testing.T) {

peerManager.UnregisterSession(id)

peerManager.Connected(p1, nil)
peerManager.Connected(p1)
if s.available[p1] {
t.Fatal("Expected no signal callback (session unregistered)")
}
Expand Down
Loading

0 comments on commit 5cfe98e

Please sign in to comment.