diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index ad85e5234..755df08a7 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -261,7 +261,6 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { mq.dhTimeoutMgr.CancelPending(cancelKs) mq.wllock.Lock() - defer mq.wllock.Unlock() workReady := false @@ -282,6 +281,10 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { } } + mq.wllock.Unlock() + + // Unlock first to be nice to the scheduler. + // Schedule a message send if workReady { mq.signalWorkReady() diff --git a/bitswap/internal/peermanager/peermanager.go b/bitswap/internal/peermanager/peermanager.go index 0cf8b2e35..522823263 100644 --- a/bitswap/internal/peermanager/peermanager.go +++ b/bitswap/internal/peermanager/peermanager.go @@ -82,18 +82,16 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID { // Connected is called to add a new peer to the pool, and send it an initial set // of wants. -func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) { +func (pm *PeerManager) Connected(p peer.ID) { pm.pqLk.Lock() defer pm.pqLk.Unlock() pq := pm.getOrCreate(p) // Inform the peer want manager that there's a new peer - pm.pwm.addPeer(p) - // Record that the want-haves are being sent to the peer - _, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves) + wants := pm.pwm.addPeer(p) // Broadcast any live want-haves to the newly connected peers - pq.AddBroadcastWantHaves(wantHaves) + pq.AddBroadcastWantHaves(wants) // Inform the sessions that the peer has connected pm.signalAvailability(p, true) } diff --git a/bitswap/internal/peermanager/peermanager_test.go b/bitswap/internal/peermanager/peermanager_test.go index f979b2c81..469aa4d19 100644 --- a/bitswap/internal/peermanager/peermanager_test.go +++ b/bitswap/internal/peermanager/peermanager_test.go @@ -82,9 +82,9 @@ func TestAddingAndRemovingPeers(t *testing.T) { self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5] peerManager := New(ctx, peerQueueFactory, self) - peerManager.Connected(peer1, nil) - peerManager.Connected(peer2, nil) - peerManager.Connected(peer3, nil) + peerManager.Connected(peer1) + peerManager.Connected(peer2) + peerManager.Connected(peer3) connectedPeers := peerManager.ConnectedPeers() @@ -108,7 +108,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { } // reconnect peer - peerManager.Connected(peer1, nil) + peerManager.Connected(peer1) connectedPeers = peerManager.ConnectedPeers() if !testutil.ContainsPeer(connectedPeers, peer1) { @@ -126,9 +126,10 @@ func TestBroadcastOnConnect(t *testing.T) { peerManager := New(ctx, peerQueueFactory, self) cids := testutil.GenerateCids(2) + peerManager.BroadcastWantHaves(ctx, cids) // Connect with two broadcast wants for first peer - peerManager.Connected(peer1, cids) + peerManager.Connected(peer1) collected := collectMessages(msgs, 2*time.Millisecond) if len(collected[peer1].wantHaves) != 2 { @@ -147,8 +148,11 @@ func TestBroadcastWantHaves(t *testing.T) { cids := testutil.GenerateCids(3) - // Connect to first peer with two broadcast wants - peerManager.Connected(peer1, []cid.Cid{cids[0], cids[1]}) + // Broadcast the first two. + peerManager.BroadcastWantHaves(ctx, cids[:2]) + + // First peer should get them. + peerManager.Connected(peer1) collected := collectMessages(msgs, 2*time.Millisecond) if len(collected[peer1].wantHaves) != 2 { @@ -156,7 +160,7 @@ func TestBroadcastWantHaves(t *testing.T) { } // Connect to second peer - peerManager.Connected(peer2, nil) + peerManager.Connected(peer2) // Send a broadcast to all peers, including cid that was already sent to // first peer @@ -165,10 +169,12 @@ func TestBroadcastWantHaves(t *testing.T) { // One of the want-haves was already sent to peer1 if len(collected[peer1].wantHaves) != 1 { - t.Fatal("Expected 1 want-haves to be sent to first peer", collected[peer1].wantHaves) + t.Fatalf("Expected 1 want-haves to be sent to first peer, got %d", + len(collected[peer1].wantHaves)) } - if len(collected[peer2].wantHaves) != 2 { - t.Fatal("Expected 2 want-haves to be sent to second peer") + if len(collected[peer2].wantHaves) != 3 { + t.Fatalf("Expected 3 want-haves to be sent to second peer, got %d", + len(collected[peer2].wantHaves)) } } @@ -182,7 +188,7 @@ func TestSendWants(t *testing.T) { peerManager := New(ctx, peerQueueFactory, self) cids := testutil.GenerateCids(4) - peerManager.Connected(peer1, nil) + peerManager.Connected(peer1) peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0]}, []cid.Cid{cids[2]}) collected := collectMessages(msgs, 2*time.Millisecond) @@ -217,8 +223,8 @@ func TestSendCancels(t *testing.T) { cids := testutil.GenerateCids(4) // Connect to peer1 and peer2 - peerManager.Connected(peer1, nil) - peerManager.Connected(peer2, nil) + peerManager.Connected(peer1) + peerManager.Connected(peer2) // Send 2 want-blocks and 1 want-have to peer1 peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]}) @@ -286,11 +292,11 @@ func TestSessionRegistration(t *testing.T) { t.Fatal("Expected peer not be available till connected") } - peerManager.Connected(p1, nil) + peerManager.Connected(p1) if !s.available[p1] { t.Fatal("Expected signal callback") } - peerManager.Connected(p2, nil) + peerManager.Connected(p2) if !s.available[p2] { t.Fatal("Expected signal callback") } @@ -305,7 +311,7 @@ func TestSessionRegistration(t *testing.T) { peerManager.UnregisterSession(id) - peerManager.Connected(p1, nil) + peerManager.Connected(p1) if s.available[p1] { t.Fatal("Expected no signal callback (session unregistered)") } diff --git a/bitswap/internal/peermanager/peerwantmanager.go b/bitswap/internal/peermanager/peerwantmanager.go index 1928966ca..418a646c4 100644 --- a/bitswap/internal/peermanager/peerwantmanager.go +++ b/bitswap/internal/peermanager/peerwantmanager.go @@ -19,10 +19,17 @@ type Gauge interface { // peerWantManager keeps track of which want-haves and want-blocks have been // sent to each peer, so that the PeerManager doesn't send duplicates. type peerWantManager struct { + // peerWants maps peers to outstanding wants. + // A peer's wants is the _union_ of the broadcast wants and the wants in + // this list. peerWants map[peer.ID]*peerWant - // Reverse index mapping wants to the peers that sent them. This is used - // to speed up cancels + + // Reverse index of all wants in peerWants. wantPeers map[cid.Cid]map[peer.ID]struct{} + + // broadcastWants tracks all the current broadcast wants. + broadcastWants *cid.Set + // Keeps track of the number of active want-blocks wantBlockGauge Gauge } @@ -36,20 +43,24 @@ type peerWant struct { // number of active want-blocks (ie sent but no response received) func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager { return &peerWantManager{ + broadcastWants: cid.NewSet(), peerWants: make(map[peer.ID]*peerWant), wantPeers: make(map[cid.Cid]map[peer.ID]struct{}), wantBlockGauge: wantBlockGauge, } } -// AddPeer adds a peer whose wants we need to keep track of -func (pwm *peerWantManager) addPeer(p peer.ID) { +// addPeer adds a peer whose wants we need to keep track of. It returns the +// current list of broadcast wants that should be sent to the peer. +func (pwm *peerWantManager) addPeer(p peer.ID) []cid.Cid { if _, ok := pwm.peerWants[p]; !ok { pwm.peerWants[p] = &peerWant{ wantBlocks: cid.NewSet(), wantHaves: cid.NewSet(), } + return pwm.broadcastWants.Keys() } + return nil } // RemovePeer removes a peer and its associated wants from tracking @@ -59,7 +70,7 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { return } - pws.wantBlocks.ForEach(func(c cid.Cid) error { + _ = pws.wantBlocks.ForEach(func(c cid.Cid) error { // Decrement the gauge by the number of pending want-blocks to the peer pwm.wantBlockGauge.Dec() // Clean up want-blocks from the reverse index @@ -68,7 +79,7 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { }) // Clean up want-haves from the reverse index - pws.wantHaves.ForEach(func(c cid.Cid) error { + _ = pws.wantHaves.ForEach(func(c cid.Cid) error { pwm.reverseIndexRemove(c, p) return nil }) @@ -79,26 +90,30 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { // PrepareBroadcastWantHaves filters the list of want-haves for each peer, // returning a map of peers to the want-haves they have not yet been sent. func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid { - res := make(map[peer.ID][]cid.Cid) + res := make(map[peer.ID][]cid.Cid, len(pwm.peerWants)) + for _, c := range wantHaves { + if pwm.broadcastWants.Has(c) { + // Already a broadcast want, skip it. + continue + } + pwm.broadcastWants.Add(c) + + // Prepare broadcast. + wantedBy := pwm.wantPeers[c] + for p := range pwm.peerWants { + // If we've already sent a want to this peer, skip them. + // + // This is faster than checking the actual wantlists due + // to better locality. + if _, ok := wantedBy[p]; ok { + continue + } - // Iterate over all known peers - for p, pws := range pwm.peerWants { - // Iterate over all want-haves - for _, c := range wantHaves { - // If the CID has not been sent as a want-block or want-have - if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) { - // Record that the CID has been sent as a want-have - pws.wantHaves.Add(c) - - // Update the reverse index - pwm.reverseIndexAdd(c, p) - - // Add the CID to the results - if _, ok := res[p]; !ok { - res[p] = make([]cid.Cid, 0, 1) - } - res[p] = append(res[p], c) + cids, ok := res[p] + if !ok { + cids = make([]cid.Cid, 0, len(wantHaves)) } + res[p] = append(cids, c) } } @@ -146,6 +161,12 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa // Iterate over the requested want-haves for _, c := range wantHaves { + // If we've already broadcasted this want, don't bother with a + // want-have. + if pwm.broadcastWants.Has(c) { + continue + } + // If the CID has not been sent as a want-block or want-have if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) { // Record that the CID was sent as a want-have @@ -166,11 +187,36 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa // returning a map of peers which only contains cancels for wants that have // been sent to the peer. func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid { - res := make(map[peer.ID][]cid.Cid) + if len(cancelKs) == 0 { + return nil + } + + // Pre-allocate enough space for all peers that have the first CID. + // Chances are these peers are related. + expectedResSize := 0 + firstCancel := cancelKs[0] + if pwm.broadcastWants.Has(firstCancel) { + expectedResSize = len(pwm.peerWants) + } else { + expectedResSize = len(pwm.wantPeers[firstCancel]) + } + res := make(map[peer.ID][]cid.Cid, expectedResSize) + + // Keep the broadcast keys separate. This lets us batch-process them at + // the end. + broadcastKs := make([]cid.Cid, 0, len(cancelKs)) // Iterate over all requested cancels for _, c := range cancelKs { - // Iterate over peers that have sent a corresponding want + // Handle broadcast wants up-front. + isBroadcast := pwm.broadcastWants.Has(c) + if isBroadcast { + broadcastKs = append(broadcastKs, c) + pwm.broadcastWants.Remove(c) + } + + // Even if this is a broadcast, we may have sent targeted wants. + // Deal with them. for p := range pwm.wantPeers[c] { pws, ok := pwm.peerWants[p] if !ok { @@ -179,28 +225,45 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][ continue } - isWantBlock := pws.wantBlocks.Has(c) - isWantHave := pws.wantHaves.Has(c) - - // If the CID was sent as a want-block, decrement the want-block count - if isWantBlock { + // Update the want gauge. + if pws.wantBlocks.Has(c) { pwm.wantBlockGauge.Dec() } - // If the CID was sent as a want-block or want-have - if isWantBlock || isWantHave { - // Remove the CID from the recorded want-blocks and want-haves - pws.wantBlocks.Remove(c) - pws.wantHaves.Remove(c) + // Unconditionally remove from the want lists. + pws.wantBlocks.Remove(c) + pws.wantHaves.Remove(c) - // Add the CID to the results - if _, ok := res[p]; !ok { - res[p] = make([]cid.Cid, 0, 1) - } - res[p] = append(res[p], c) + // If it's a broadcast want, we've already added it to + // the broadcastKs list. + if isBroadcast { + continue + } - // Update the reverse index - pwm.reverseIndexRemove(c, p) + // Add the CID to the result for the peer. + cids, ok := res[p] + if !ok { + // Pre-allocate enough for all keys. + // Cancels are usually related. + cids = make([]cid.Cid, 0, len(cancelKs)) + } + res[p] = append(cids, c) + } + + // Finally, batch-remove the reverse-index. There's no need to + // clear this index peer-by-peer. + delete(pwm.wantPeers, c) + } + + // If we have any broadcasted CIDs, add them in. + // + // Doing this at the end can save us a bunch of work and allocations. + if len(broadcastKs) > 0 { + for p := range pwm.peerWants { + if cids, ok := res[p]; ok { + res[p] = append(cids, broadcastKs...) + } else { + res[p] = broadcastKs } } } @@ -212,7 +275,7 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) { peers, ok := pwm.wantPeers[c] if !ok { - peers = make(map[peer.ID]struct{}, 1) + peers = make(map[peer.ID]struct{}, 10) pwm.wantPeers[c] = peers } peers[p] = struct{}{} @@ -235,7 +298,7 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid { // Iterate over all known peers for _, pws := range pwm.peerWants { // Iterate over all want-blocks - pws.wantBlocks.ForEach(func(c cid.Cid) error { + _ = pws.wantBlocks.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) return nil @@ -249,41 +312,37 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid { func (pwm *peerWantManager) getWantHaves() []cid.Cid { res := cid.NewSet() - // Iterate over all known peers + // Iterate over all peers with active wants. for _, pws := range pwm.peerWants { // Iterate over all want-haves - pws.wantHaves.ForEach(func(c cid.Cid) error { + _ = pws.wantHaves.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) return nil }) } + _ = pwm.broadcastWants.ForEach(func(c cid.Cid) error { + res.Add(c) + return nil + }) return res.Keys() } // GetWants returns the set of all wants (both want-blocks and want-haves). func (pwm *peerWantManager) getWants() []cid.Cid { - res := cid.NewSet() - - // Iterate over all known peers - for _, pws := range pwm.peerWants { - // Iterate over all want-blocks - pws.wantBlocks.ForEach(func(c cid.Cid) error { - // Add the CID to the results - res.Add(c) - return nil - }) + res := pwm.broadcastWants.Keys() - // Iterate over all want-haves - pws.wantHaves.ForEach(func(c cid.Cid) error { - // Add the CID to the results - res.Add(c) - return nil - }) + // Iterate over all targeted wants, removing ones that are also in the + // broadcast list. + for c := range pwm.wantPeers { + if pwm.broadcastWants.Has(c) { + continue + } + res = append(res, c) } - return res.Keys() + return res } func (pwm *peerWantManager) String() string { diff --git a/bitswap/internal/peermanager/peerwantmanager_test.go b/bitswap/internal/peermanager/peerwantmanager_test.go index a56df168a..766033e8f 100644 --- a/bitswap/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/internal/peermanager/peerwantmanager_test.go @@ -38,8 +38,12 @@ func TestPrepareBroadcastWantHaves(t *testing.T) { cids2 := testutil.GenerateCids(2) cids3 := testutil.GenerateCids(2) - pwm.addPeer(peers[0]) - pwm.addPeer(peers[1]) + if blist := pwm.addPeer(peers[0]); len(blist) > 0 { + t.Errorf("expected no broadcast wants") + } + if blist := pwm.addPeer(peers[1]); len(blist) > 0 { + t.Errorf("expected no broadcast wants") + } // Broadcast 2 cids to 2 peers bcst := pwm.prepareBroadcastWantHaves(cids) @@ -104,16 +108,19 @@ func TestPrepareBroadcastWantHaves(t *testing.T) { } } + allCids := cids + allCids = append(allCids, cids2...) + allCids = append(allCids, cids3...) + allCids = append(allCids, cids4...) + // Add another peer - pwm.addPeer(peers[2]) - bcst6 := pwm.prepareBroadcastWantHaves(cids) - if len(bcst6) != 1 { - t.Fatal("Expected 1 peer") + bcst6 := pwm.addPeer(peers[2]) + if !testutil.MatchKeysIgnoreOrder(bcst6, allCids) { + t.Fatalf("Expected all cids to be broadcast.") } - for p := range bcst6 { - if !testutil.MatchKeysIgnoreOrder(bcst6[p], cids) { - t.Fatal("Expected all cids to be broadcast") - } + + if broadcast := pwm.prepareBroadcastWantHaves(allCids); len(broadcast) != 0 { + t.Errorf("did not expect to have CIDs to broadcast") } } diff --git a/bitswap/internal/wantmanager/wantmanager.go b/bitswap/internal/wantmanager/wantmanager.go index 908f9dca3..539017a9d 100644 --- a/bitswap/internal/wantmanager/wantmanager.go +++ b/bitswap/internal/wantmanager/wantmanager.go @@ -6,7 +6,6 @@ import ( bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" "github.com/ipfs/go-bitswap/internal/sessionmanager" - bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist" logging "github.com/ipfs/go-log" cid "github.com/ipfs/go-cid" @@ -17,9 +16,8 @@ var log = logging.Logger("bitswap") // PeerHandler sends wants / cancels to other peers type PeerHandler interface { - // Connected is called when a peer connects, with any initial want-haves - // that have been broadcast to all peers (as part of session discovery) - Connected(p peer.ID, initialWants []cid.Cid) + // Connected is called when a peer connects. + Connected(p peer.ID) // Disconnected is called when a peer disconnects Disconnected(p peer.ID) // BroadcastWantHaves sends want-haves to all connected peers @@ -38,11 +36,7 @@ type SessionManager interface { // - informs the SessionManager and BlockPresenceManager of incoming information // and cancelled sessions // - informs the PeerManager of connects and disconnects -// - manages the list of want-haves that are broadcast to the internet -// (as opposed to being sent to specific peers) type WantManager struct { - bcwl *bsswl.SessionWantlist - peerHandler PeerHandler sim *bssim.SessionInterestManager bpm *bsbpm.BlockPresenceManager @@ -52,7 +46,6 @@ type WantManager struct { // New initializes a new WantManager for a given context. func New(ctx context.Context, peerHandler PeerHandler, sim *bssim.SessionInterestManager, bpm *bsbpm.BlockPresenceManager) *WantManager { return &WantManager{ - bcwl: bsswl.NewSessionWantlist(), peerHandler: peerHandler, sim: sim, bpm: bpm, @@ -69,8 +62,6 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci wm.bpm.ReceiveFrom(p, haves, dontHaves) // Inform interested sessions wm.sm.ReceiveFrom(p, blks, haves, dontHaves) - // Remove received blocks from broadcast wantlist - wm.bcwl.RemoveKeys(blks) // Send CANCEL to all peers with want-have / want-block wm.peerHandler.SendCancels(ctx, blks) } @@ -78,11 +69,10 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci // BroadcastWantHaves is called when want-haves should be broadcast to all // connected peers (as part of session discovery) func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) { - log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves) - - // Record broadcast wants - wm.bcwl.Add(wantHaves, ses) + // TODO: Avoid calling broadcast through here. It doesn't fit with + // everything else this module does. + log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves) // Send want-haves to all peers wm.peerHandler.BroadcastWantHaves(ctx, wantHaves) } @@ -92,9 +82,6 @@ func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) { // Remove session's interest in the given blocks. cancelKs := wm.sim.RemoveSessionInterest(ses) - // Remove broadcast want-haves for session - wm.bcwl.RemoveSession(ses) - // Free up block presence tracking for keys that no session is interested // in anymore wm.bpm.RemoveKeys(cancelKs) @@ -107,7 +94,7 @@ func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) { func (wm *WantManager) Connected(p peer.ID) { // Tell the peer handler that there is a new connection and give it the // list of outstanding broadcast wants - wm.peerHandler.Connected(p, wm.bcwl.Keys()) + wm.peerHandler.Connected(p) } // Disconnected is called when a peer disconnects diff --git a/bitswap/internal/wantmanager/wantmanager_test.go b/bitswap/internal/wantmanager/wantmanager_test.go index 38d41d9f1..9855eb30d 100644 --- a/bitswap/internal/wantmanager/wantmanager_test.go +++ b/bitswap/internal/wantmanager/wantmanager_test.go @@ -14,13 +14,11 @@ import ( ) type fakePeerHandler struct { - lastInitialWants []cid.Cid - lastBcstWants []cid.Cid - lastCancels []cid.Cid + lastBcstWants []cid.Cid + lastCancels []cid.Cid } -func (fph *fakePeerHandler) Connected(p peer.ID, initialWants []cid.Cid) { - fph.lastInitialWants = initialWants +func (fph *fakePeerHandler) Connected(p peer.ID) { } func (fph *fakePeerHandler) Disconnected(p peer.ID) { @@ -39,124 +37,6 @@ func (*fakeSessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Ci return nil } -func TestInitialBroadcastWantsAddedCorrectly(t *testing.T) { - ctx := context.Background() - ph := &fakePeerHandler{} - sim := bssim.New() - bpm := bsbpm.New() - wm := New(context.Background(), ph, sim, bpm) - sm := &fakeSessionManager{} - wm.SetSessionManager(sm) - - peers := testutil.GeneratePeers(3) - - // Connect peer 0. Should not receive anything yet. - wm.Connected(peers[0]) - if len(ph.lastInitialWants) != 0 { - t.Fatal("expected no initial wants") - } - - // Broadcast 2 wants - wantHaves := testutil.GenerateCids(2) - wm.BroadcastWantHaves(ctx, 1, wantHaves) - if len(ph.lastBcstWants) != 2 { - t.Fatal("expected broadcast wants") - } - - // Connect peer 1. Should receive all wants broadcast so far. - wm.Connected(peers[1]) - if len(ph.lastInitialWants) != 2 { - t.Fatal("expected broadcast wants") - } - - // Broadcast 3 more wants - wantHaves2 := testutil.GenerateCids(3) - wm.BroadcastWantHaves(ctx, 2, wantHaves2) - if len(ph.lastBcstWants) != 3 { - t.Fatal("expected broadcast wants") - } - - // Connect peer 2. Should receive all wants broadcast so far. - wm.Connected(peers[2]) - if len(ph.lastInitialWants) != 5 { - t.Fatal("expected all wants to be broadcast") - } -} - -func TestReceiveFromRemovesBroadcastWants(t *testing.T) { - ctx := context.Background() - ph := &fakePeerHandler{} - sim := bssim.New() - bpm := bsbpm.New() - wm := New(context.Background(), ph, sim, bpm) - sm := &fakeSessionManager{} - wm.SetSessionManager(sm) - - peers := testutil.GeneratePeers(3) - - // Broadcast 2 wants - cids := testutil.GenerateCids(2) - wm.BroadcastWantHaves(ctx, 1, cids) - if len(ph.lastBcstWants) != 2 { - t.Fatal("expected broadcast wants") - } - - // Connect peer 0. Should receive all wants. - wm.Connected(peers[0]) - if len(ph.lastInitialWants) != 2 { - t.Fatal("expected broadcast wants") - } - - // Receive block for first want - ks := cids[0:1] - haves := []cid.Cid{} - dontHaves := []cid.Cid{} - wm.ReceiveFrom(ctx, peers[1], ks, haves, dontHaves) - - // Connect peer 2. Should get remaining want (the one that the block has - // not yet been received for). - wm.Connected(peers[2]) - if len(ph.lastInitialWants) != 1 { - t.Fatal("expected remaining wants") - } -} - -func TestRemoveSessionRemovesBroadcastWants(t *testing.T) { - ctx := context.Background() - ph := &fakePeerHandler{} - sim := bssim.New() - bpm := bsbpm.New() - wm := New(context.Background(), ph, sim, bpm) - sm := &fakeSessionManager{} - wm.SetSessionManager(sm) - - peers := testutil.GeneratePeers(2) - - // Broadcast 2 wants for session 0 and 2 wants for session 1 - ses0 := uint64(0) - ses1 := uint64(1) - ses0wants := testutil.GenerateCids(2) - ses1wants := testutil.GenerateCids(2) - wm.BroadcastWantHaves(ctx, ses0, ses0wants) - wm.BroadcastWantHaves(ctx, ses1, ses1wants) - - // Connect peer 0. Should receive all wants. - wm.Connected(peers[0]) - if len(ph.lastInitialWants) != 4 { - t.Fatal("expected broadcast wants") - } - - // Remove session 0 - wm.RemoveSession(ctx, ses0) - - // Connect peer 1. Should receive all wants from session that has not been - // removed. - wm.Connected(peers[1]) - if len(ph.lastInitialWants) != 2 { - t.Fatal("expected broadcast wants") - } -} - func TestReceiveFrom(t *testing.T) { ctx := context.Background() ph := &fakePeerHandler{}