diff --git a/bitswap.go b/bitswap.go index 5e1c5b05..1b59dcd0 100644 --- a/bitswap.go +++ b/bitswap.go @@ -148,10 +148,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) bssm.Session { - return bssession.New(ctx, id, wm, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) + return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { - return bsspm.New(ctx, id, network.ConnectionManager(), pqm) + return bsspm.New(id, network.ConnectionManager()) } notif := notifications.New() sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 0ea93c43..96284756 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -394,9 +394,9 @@ func TestWantlistRebroadcast(t *testing.T) { t.Fatal("wrong number of wants") } - // Tell message queue to rebroadcast after 5ms, then wait 8ms - messageQueue.SetRebroadcastInterval(5 * time.Millisecond) - messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond) + // Tell message queue to rebroadcast after 10ms, then wait 15ms + messageQueue.SetRebroadcastInterval(10 * time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, 15*time.Millisecond) firstMessage = messages[0] // Both original and new wants should have been rebroadcast @@ -425,9 +425,9 @@ func TestWantlistRebroadcast(t *testing.T) { } } - // Tell message queue to rebroadcast after 5ms, then wait 8ms - messageQueue.SetRebroadcastInterval(5 * time.Millisecond) - messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond) + // Tell message queue to rebroadcast after 10ms, then wait 15ms + messageQueue.SetRebroadcastInterval(10 * time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, 15*time.Millisecond) firstMessage = messages[0] if len(firstMessage.Wantlist()) != totalWants-len(cancels) { t.Fatal("did not rebroadcast all wants") diff --git a/internal/session/peeravailabilitymanager.go b/internal/session/peeravailabilitymanager.go deleted file mode 100644 index 31b887c6..00000000 --- a/internal/session/peeravailabilitymanager.go +++ /dev/null @@ -1,57 +0,0 @@ -package session - -import ( - peer "github.com/libp2p/go-libp2p-core/peer" -) - -// peerAvailabilityManager keeps track of which peers have available space -// to receive want requests -type peerAvailabilityManager struct { - peerAvailable map[peer.ID]bool -} - -func newPeerAvailabilityManager() *peerAvailabilityManager { - return &peerAvailabilityManager{ - peerAvailable: make(map[peer.ID]bool), - } -} - -func (pam *peerAvailabilityManager) addPeer(p peer.ID) { - pam.peerAvailable[p] = false -} - -func (pam *peerAvailabilityManager) isAvailable(p peer.ID) (bool, bool) { - is, ok := pam.peerAvailable[p] - return is, ok -} - -func (pam *peerAvailabilityManager) setPeerAvailability(p peer.ID, isAvailable bool) { - pam.peerAvailable[p] = isAvailable -} - -func (pam *peerAvailabilityManager) haveAvailablePeers() bool { - for _, isAvailable := range pam.peerAvailable { - if isAvailable { - return true - } - } - return false -} - -func (pam *peerAvailabilityManager) availablePeers() []peer.ID { - var available []peer.ID - for p, isAvailable := range pam.peerAvailable { - if isAvailable { - available = append(available, p) - } - } - return available -} - -func (pam *peerAvailabilityManager) allPeers() []peer.ID { - var available []peer.ID - for p := range pam.peerAvailable { - available = append(available, p) - } - return available -} diff --git a/internal/session/peeravailabilitymanager_test.go b/internal/session/peeravailabilitymanager_test.go deleted file mode 100644 index 1d5b8f23..00000000 --- a/internal/session/peeravailabilitymanager_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package session - -import ( - "testing" - - "github.com/ipfs/go-bitswap/internal/testutil" -) - -func TestPeerAvailabilityManager(t *testing.T) { - peers := testutil.GeneratePeers(2) - pam := newPeerAvailabilityManager() - - isAvailable, ok := pam.isAvailable(peers[0]) - if isAvailable || ok { - t.Fatal("expected not to have any availability yet") - } - - if pam.haveAvailablePeers() { - t.Fatal("expected not to have any availability yet") - } - - pam.addPeer(peers[0]) - isAvailable, ok = pam.isAvailable(peers[0]) - if !ok { - t.Fatal("expected to have a peer") - } - if isAvailable { - t.Fatal("expected not to have any availability yet") - } - if pam.haveAvailablePeers() { - t.Fatal("expected not to have any availability yet") - } - if len(pam.availablePeers()) != 0 { - t.Fatal("expected not to have any availability yet") - } - if len(pam.allPeers()) != 1 { - t.Fatal("expected one peer") - } - - pam.setPeerAvailability(peers[0], true) - isAvailable, ok = pam.isAvailable(peers[0]) - if !ok { - t.Fatal("expected to have a peer") - } - if !isAvailable { - t.Fatal("expected peer to be available") - } - if !pam.haveAvailablePeers() { - t.Fatal("expected peer to be available") - } - if len(pam.availablePeers()) != 1 { - t.Fatal("expected peer to be available") - } - if len(pam.allPeers()) != 1 { - t.Fatal("expected one peer") - } - - pam.addPeer(peers[1]) - if len(pam.availablePeers()) != 1 { - t.Fatal("expected one peer to be available") - } - if len(pam.allPeers()) != 2 { - t.Fatal("expected two peers") - } - - pam.setPeerAvailability(peers[0], false) - isAvailable, ok = pam.isAvailable(peers[0]) - if !ok { - t.Fatal("expected to have a peer") - } - if isAvailable { - t.Fatal("expected peer to not be available") - } -} diff --git a/internal/session/session.go b/internal/session/session.go index c41a65d4..b9231928 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -2,7 +2,6 @@ package session import ( "context" - "sync" "time" // lu "github.com/ipfs/go-bitswap/internal/logutil" @@ -49,23 +48,26 @@ type PeerManager interface { SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) } -// PeerManager provides an interface for tracking and optimize peers, and -// requesting more when neccesary. +// SessionPeerManager keeps track of peers in the session type SessionPeerManager interface { - // ReceiveFrom is called when blocks and HAVEs are received from a peer. - // It returns a boolean indicating if the peer is new to the session. - ReceiveFrom(peerId peer.ID, blks []cid.Cid, haves []cid.Cid) bool - // Peers returns the set of peers in the session. - Peers() *peer.Set - // FindMorePeers queries Content Routing to discover providers of the given cid - FindMorePeers(context.Context, cid.Cid) - // RecordPeerRequests records the time that a cid was requested from a peer - RecordPeerRequests([]peer.ID, []cid.Cid) - // RecordPeerResponse records the time that a response for a cid arrived - // from a peer - RecordPeerResponse(peer.ID, []cid.Cid) - // RecordCancels records that cancels were sent for the given cids - RecordCancels([]cid.Cid) + // PeersDiscovered indicates if any peers have been discovered yet + PeersDiscovered() bool + // Shutdown the SessionPeerManager + Shutdown() + // Adds a peer to the session, returning true if the peer is new + AddPeer(peer.ID) bool + // Removes a peer from the session, returning true if the peer existed + RemovePeer(peer.ID) bool + // All peers in the session + Peers() []peer.ID + // Whether there are any peers in the session + HasPeers() bool +} + +// ProviderFinder is used to find providers for a given key +type ProviderFinder interface { + // FindProvidersAsync searches for peers that provide the given CID + FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID } // opType is the kind of operation that is being processed by the event loop @@ -80,6 +82,8 @@ const ( opCancel // Broadcast want-haves opBroadcast + // Wants sent to peers + opWantsSent ) type op struct { @@ -92,10 +96,11 @@ type op struct { // info to, and who to request blocks from. type Session struct { // dependencies - ctx context.Context - wm WantManager - sprm SessionPeerManager - sim *bssim.SessionInterestManager + ctx context.Context + wm WantManager + sprm SessionPeerManager + providerFinder ProviderFinder + sim *bssim.SessionInterestManager sw sessionWants sws sessionWantSender @@ -127,6 +132,7 @@ func New(ctx context.Context, id uint64, wm WantManager, sprm SessionPeerManager, + providerFinder ProviderFinder, sim *bssim.SessionInterestManager, pm PeerManager, bpm *bsbpm.BlockPresenceManager, @@ -140,6 +146,7 @@ func New(ctx context.Context, ctx: ctx, wm: wm, sprm: sprm, + providerFinder: providerFinder, sim: sim, incoming: make(chan op, 128), latencyTrkr: latencyTracker{}, @@ -151,7 +158,7 @@ func New(ctx context.Context, periodicSearchDelay: periodicSearchDelay, self: self, } - s.sws = newSessionWantSender(ctx, id, pm, bpm, s.onWantsSent, s.onPeersExhausted) + s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted) go s.run(ctx) @@ -164,44 +171,25 @@ func (s *Session) ID() uint64 { // ReceiveFrom receives incoming blocks from the given peer. func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { + // The SessionManager tells each Session about all keys that it may be + // interested in. Here the Session filters the keys to the ones that this + // particular Session is interested in. interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves) ks = interestedRes[0] haves = interestedRes[1] dontHaves = interestedRes[2] // s.logReceiveFrom(from, ks, haves, dontHaves) - // Add any newly discovered peers that have blocks we're interested in to - // the peer set - isNewPeer := s.sprm.ReceiveFrom(from, ks, haves) - - // Record response timing only if the blocks came from the network - // (blocks can also be received from the local node) - if len(ks) > 0 && from != "" { - s.sprm.RecordPeerResponse(from, ks) - } - - // Update want potential - s.sws.Update(from, ks, haves, dontHaves, isNewPeer) + // Inform the session want sender that a message has been received + s.sws.Update(from, ks, haves, dontHaves) if len(ks) == 0 { return } - // Record which blocks have been received and figure out the total latency - // for fetching the blocks - wanted, totalLatency := s.sw.BlocksReceived(ks) - s.latencyTrkr.receiveUpdate(len(wanted), totalLatency) - - if len(wanted) == 0 { - return - } - - // Inform the SessionInterestManager that this session is no longer - // expecting to receive the wanted keys - s.sim.RemoveSessionWants(s.id, wanted) - + // Inform the session that blocks have been received select { - case s.incoming <- op{op: opReceive, keys: wanted}: + case s.incoming <- op{op: opReceive, keys: ks}: case <-s.ctx.Done(): } } @@ -220,28 +208,6 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontH // } // } -func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { - allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...) - s.sw.WantsSent(allBlks) - s.sprm.RecordPeerRequests([]peer.ID{p}, allBlks) -} - -func (s *Session) onPeersExhausted(ks []cid.Cid) { - // We don't want to block the sessionWantSender if the incoming channel - // is full. So if we can't immediately send on the incoming channel spin - // it off into a go-routine. - select { - case s.incoming <- op{op: opBroadcast, keys: ks}: - default: - go func() { - select { - case s.incoming <- op{op: opBroadcast, keys: ks}: - case <-s.ctx.Done(): - } - }() - } -} - // GetBlock fetches a single block. func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { return bsgetter.SyncGetBlock(parent, k, s.GetBlocks) @@ -278,6 +244,34 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { } } +// onWantsSent is called when wants are sent to a peer by the session wants sender +func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { + allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...) + s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks}) +} + +// onPeersExhausted is called when all available peers have sent DONT_HAVE for +// a set of cids (or all peers become unavailable) +func (s *Session) onPeersExhausted(ks []cid.Cid) { + s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks}) +} + +// We don't want to block the sessionWantSender if the incoming channel +// is full. So if we can't immediately send on the incoming channel spin +// it off into a go-routine. +func (s *Session) nonBlockingEnqueue(o op) { + select { + case s.incoming <- o: + default: + go func() { + select { + case s.incoming <- o: + case <-s.ctx.Done(): + } + }() + } +} + // Session run loop -- everything in this function should not be called // outside of this loop func (s *Session) run(ctx context.Context) { @@ -290,23 +284,34 @@ func (s *Session) run(ctx context.Context) { case oper := <-s.incoming: switch oper.op { case opReceive: + // Received blocks s.handleReceive(oper.keys) case opWant: + // Client wants blocks s.wantBlocks(ctx, oper.keys) case opCancel: + // Wants were cancelled s.sw.CancelPending(oper.keys) + case opWantsSent: + // Wants were sent to a peer + s.sw.WantsSent(oper.keys) case opBroadcast: + // Broadcast want-haves to all peers s.broadcastWantHaves(ctx, oper.keys) default: panic("unhandled operation") } case <-s.idleTick.C: + // The session hasn't received blocks for a while, broadcast s.broadcastWantHaves(ctx, nil) case <-s.periodicSearchTimer.C: + // Periodically search for a random live want s.handlePeriodicSearch(ctx) case baseTickDelay := <-s.tickDelayReqs: + // Set the base tick delay s.baseTickDelay = baseTickDelay case <-ctx.Done(): + // Shutdown s.handleShutdown() return } @@ -327,7 +332,6 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // log.Infof("Ses%d: broadcast %d keys\n", s.id, len(live)) // Broadcast a want-have for the live wants to everyone we're connected to - s.sprm.RecordPeerRequests(nil, wants) s.wm.BroadcastWantHaves(ctx, s.id, wants) // do not find providers on consecutive ticks @@ -337,7 +341,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // Typically if the provider has the first block they will have // the rest of the blocks also. log.Warnf("Ses%d: FindMorePeers with want 0 of %d wants", s.id, len(wants)) - s.sprm.FindMorePeers(ctx, wants[0]) + s.findMorePeers(ctx, wants[0]) } s.resetIdleTick() @@ -347,6 +351,8 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { } } +// handlePeriodicSearch is called periodically to search for providers of a +// randomly chosen CID in the sesssion. func (s *Session) handlePeriodicSearch(ctx context.Context) { randomWant := s.sw.RandomLiveWant() if !randomWant.Defined() { @@ -355,40 +361,74 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // TODO: come up with a better strategy for determining when to search // for new providers for blocks. - s.sprm.FindMorePeers(ctx, randomWant) + s.findMorePeers(ctx, randomWant) s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant}) s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) } +// findMorePeers attempts to find more peers for a session by searching for +// providers for the given Cid +func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { + go func(k cid.Cid) { + for p := range s.providerFinder.FindProvidersAsync(ctx, k) { + // When a provider indicates that it has a cid, it's equivalent to + // the providing peer sending a HAVE + s.sws.Update(p, nil, []cid.Cid{c}, nil) + } + }(c) +} + +// handleShutdown is called when the session shuts down func (s *Session) handleShutdown() { + // Stop the idle timer s.idleTick.Stop() + // Shut down the session peer manager + s.sprm.Shutdown() + // Remove the session from the want manager s.wm.RemoveSession(s.ctx, s.id) } +// handleReceive is called when the session receives blocks from a peer func (s *Session) handleReceive(ks []cid.Cid) { + // Record which blocks have been received and figure out the total latency + // for fetching the blocks + wanted, totalLatency := s.sw.BlocksReceived(ks) + if len(wanted) == 0 { + return + } + + // Record latency + s.latencyTrkr.receiveUpdate(len(wanted), totalLatency) + + // Inform the SessionInterestManager that this session is no longer + // expecting to receive the wanted keys + s.sim.RemoveSessionWants(s.id, wanted) + s.idleTick.Stop() // We've received new wanted blocks, so reset the number of ticks // that have occurred since the last new block s.consecutiveTicks = 0 - s.sprm.RecordCancels(ks) - s.resetIdleTick() } +// wantBlocks is called when blocks are requested by the client func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { if len(newks) > 0 { + // Inform the SessionInterestManager that this session is interested in the keys s.sim.RecordSessionInterest(s.id, newks) + // Tell the sessionWants tracker that that the wants have been requested s.sw.BlocksRequested(newks) + // Tell the sessionWantSender that the blocks have been requested s.sws.Add(newks) } - // If we have discovered peers already, the SessionPotentialManager will + // If we have discovered peers already, the sessionWantSender will // send wants to them - if s.sprm.Peers().Size() > 0 { + if s.sprm.PeersDiscovered() { return } @@ -396,11 +436,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { ks := s.sw.GetNextWants(broadcastLiveWantsLimit) if len(ks) > 0 { log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks)) - s.sprm.RecordPeerRequests(nil, ks) s.wm.BroadcastWantHaves(ctx, s.id, ks) } } +// The session will broadcast if it has outstanding wants and doesn't receive +// any blocks for some time. +// The length of time is calculated +// - initially +// as a fixed delay +// - once some blocks are received +// from a base delay and average latency, with a backoff func (s *Session) resetIdleTick() { var tickDelay time.Duration if !s.latencyTrkr.hasLatency() { @@ -414,30 +460,22 @@ func (s *Session) resetIdleTick() { s.idleTick.Reset(tickDelay) } +// latencyTracker keeps track of the average latency between sending a want +// and receiving the corresponding block type latencyTracker struct { - sync.RWMutex totalLatency time.Duration count int } func (lt *latencyTracker) hasLatency() bool { - lt.RLock() - defer lt.RUnlock() - return lt.totalLatency > 0 && lt.count > 0 } func (lt *latencyTracker) averageLatency() time.Duration { - lt.RLock() - defer lt.RUnlock() - return lt.totalLatency / time.Duration(lt.count) } func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) { - lt.Lock() - defer lt.Unlock() - lt.totalLatency += totalLatency lt.count += count } diff --git a/internal/session/session_test.go b/internal/session/session_test.go index b3ae26b2..d40036d3 100644 --- a/internal/session/session_test.go +++ b/internal/session/session_test.go @@ -9,6 +9,7 @@ import ( notifications "github.com/ipfs/go-bitswap/internal/notifications" bspm "github.com/ipfs/go-bitswap/internal/peermanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" + bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager" "github.com/ipfs/go-bitswap/internal/testutil" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" @@ -38,40 +39,41 @@ func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid uint64 } func (fwm *fakeWantManager) RemoveSession(context.Context, uint64) {} -type fakeSessionPeerManager struct { - peers *peer.Set - findMorePeersRequested chan cid.Cid +func newFakeSessionPeerManager() *bsspm.SessionPeerManager { + return bsspm.New(1, newFakePeerTagger()) } -func newFakeSessionPeerManager() *fakeSessionPeerManager { - return &fakeSessionPeerManager{ - peers: peer.NewSet(), - findMorePeersRequested: make(chan cid.Cid, 1), - } +type fakePeerTagger struct { } -func (fpm *fakeSessionPeerManager) FindMorePeers(ctx context.Context, k cid.Cid) { - select { - case fpm.findMorePeersRequested <- k: - case <-ctx.Done(): - } +func newFakePeerTagger() *fakePeerTagger { + return &fakePeerTagger{} } -func (fpm *fakeSessionPeerManager) Peers() *peer.Set { - return fpm.peers +func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, val int) { +} +func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) { } -func (fpm *fakeSessionPeerManager) ReceiveFrom(p peer.ID, ks []cid.Cid, haves []cid.Cid) bool { - if !fpm.peers.Contains(p) { - fpm.peers.Add(p) - return true +type fakeProviderFinder struct { + findMorePeersRequested chan cid.Cid +} + +func newFakeProviderFinder() *fakeProviderFinder { + return &fakeProviderFinder{ + findMorePeersRequested: make(chan cid.Cid, 1), } - return false } -func (fpm *fakeSessionPeerManager) RecordCancels(c []cid.Cid) {} -func (fpm *fakeSessionPeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {} -func (fpm *fakeSessionPeerManager) RecordPeerResponse(p peer.ID, c []cid.Cid) { - fpm.peers.Add(p) + +func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID { + go func() { + select { + case fpf.findMorePeersRequested <- k: + case <-ctx.Done(): + } + }() + + return make(chan peer.ID) } type fakePeerManager struct { @@ -88,22 +90,24 @@ func (pm *fakePeerManager) UnregisterSession(uint64) func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} func TestSessionGetBlocks(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) var cids []cid.Cid for _, block := range blks { cids = append(cids, block.Cid()) } + _, err := session.GetBlocks(ctx, cids) if err != nil { @@ -125,14 +129,16 @@ func TestSessionGetBlocks(t *testing.T) { } // Simulate receiving HAVEs from several peers - peers := testutil.GeneratePeers(broadcastLiveWantsLimit) + peers := testutil.GeneratePeers(5) for i, p := range peers { blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])] session.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{blk.Cid()}, []cid.Cid{}) } + time.Sleep(10 * time.Millisecond) + // Verify new peers were recorded - if !testutil.MatchPeersIgnoreOrder(fpm.Peers().Peers(), peers) { + if !testutil.MatchPeersIgnoreOrder(fpm.Peers(), peers) { t.Fatal("peers not recorded by the peer manager") } @@ -145,6 +151,8 @@ func TestSessionGetBlocks(t *testing.T) { // Simulate receiving DONT_HAVE for a CID session.ReceiveFrom(peers[0], []cid.Cid{}, []cid.Cid{}, []cid.Cid{blks[0].Cid()}) + time.Sleep(10 * time.Millisecond) + // Verify session still wants received blocks _, unwanted = sim.SplitWantedUnwanted(blks) if len(unwanted) > 0 { @@ -154,6 +162,8 @@ func TestSessionGetBlocks(t *testing.T) { // Simulate receiving block for a CID session.ReceiveFrom(peers[1], []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{}) + time.Sleep(10 * time.Millisecond) + // Verify session no longer wants received block wanted, unwanted := sim.SplitWantedUnwanted(blks) if len(unwanted) != 1 || !unwanted[0].Cid().Equals(blks[0].Cid()) { @@ -169,12 +179,13 @@ func TestSessionFindMorePeers(t *testing.T) { defer cancel() fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session.SetBaseTickDelay(200 * time.Microsecond) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) @@ -223,7 +234,7 @@ func TestSessionFindMorePeers(t *testing.T) { // The session should eventually try to find more peers select { - case <-fpm.findMorePeersRequested: + case <-fpf.findMorePeersRequested: case <-ctx.Done(): t.Fatal("Did not find more peers") } @@ -234,12 +245,14 @@ func TestSessionOnPeersExhausted(t *testing.T) { defer cancel() fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() + sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5) var cids []cid.Cid @@ -277,12 +290,13 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { defer cancel() fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") + session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(4) var cids []cid.Cid @@ -314,7 +328,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // Wait for a request to find more peers to occur select { - case k := <-fpm.findMorePeersRequested: + case k := <-fpf.findMorePeersRequested: if testutil.IndexOf(blks, k) == -1 { t.Fatal("did not rebroadcast an active want") } @@ -369,14 +383,14 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // Should not have tried to find peers on consecutive ticks select { - case <-fpm.findMorePeersRequested: + case <-fpf.findMorePeersRequested: t.Fatal("Should not have tried to find peers on consecutive ticks") default: } // Wait for rebroadcast to occur select { - case k := <-fpm.findMorePeersRequested: + case k := <-fpf.findMorePeersRequested: if testutil.IndexOf(blks, k) == -1 { t.Fatal("did not rebroadcast an active want") } @@ -388,6 +402,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() @@ -396,7 +411,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - session := New(sessctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(sessctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer timerCancel() @@ -430,12 +445,14 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) fwm := newFakeWantManager() fpm := newFakeSessionPeerManager() + fpf := newFakeProviderFinder() + sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(2) cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()} diff --git a/internal/session/sessionwants.go b/internal/session/sessionwants.go index 9f896049..ad8dcd1b 100644 --- a/internal/session/sessionwants.go +++ b/internal/session/sessionwants.go @@ -3,7 +3,6 @@ package session import ( "fmt" "math/rand" - "sync" "time" cid "github.com/ipfs/go-cid" @@ -12,7 +11,6 @@ import ( // sessionWants keeps track of which cids are waiting to be sent out, and which // peers are "live" - ie, we've sent a request but haven't received a block yet type sessionWants struct { - sync.RWMutex toFetch *cidQueue liveWants map[cid.Cid]time.Time } @@ -30,9 +28,6 @@ func (sw *sessionWants) String() string { // BlocksRequested is called when the client makes a request for blocks func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) { - sw.Lock() - defer sw.Unlock() - for _, k := range newWants { sw.toFetch.Push(k) } @@ -43,9 +38,6 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) { func (sw *sessionWants) GetNextWants(limit int) []cid.Cid { now := time.Now() - sw.Lock() - defer sw.Unlock() - // Move CIDs from fetch queue to the live wants queue (up to the limit) currentLiveCount := len(sw.liveWants) toAdd := limit - currentLiveCount @@ -63,10 +55,6 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid { // WantsSent is called when wants are sent to a peer func (sw *sessionWants) WantsSent(ks []cid.Cid) { now := time.Now() - - sw.Lock() - defer sw.Unlock() - for _, c := range ks { if _, ok := sw.liveWants[c]; !ok { sw.toFetch.Remove(c) @@ -86,12 +74,8 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) } now := time.Now() - - sw.Lock() - defer sw.Unlock() - for _, c := range ks { - if sw.unlockedIsWanted(c) { + if sw.isWanted(c) { wanted = append(wanted, c) sentAt, ok := sw.liveWants[c] @@ -113,10 +97,6 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // live want CIDs. func (sw *sessionWants) PrepareBroadcast() []cid.Cid { now := time.Now() - - sw.Lock() - defer sw.Unlock() - live := make([]cid.Cid, 0, len(sw.liveWants)) for c := range sw.liveWants { live = append(live, c) @@ -127,9 +107,6 @@ func (sw *sessionWants) PrepareBroadcast() []cid.Cid { // CancelPending removes the given CIDs from the fetch queue. func (sw *sessionWants) CancelPending(keys []cid.Cid) { - sw.Lock() - defer sw.Unlock() - for _, k := range keys { sw.toFetch.Remove(k) } @@ -137,9 +114,6 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) { // LiveWants returns a list of live wants func (sw *sessionWants) LiveWants() []cid.Cid { - sw.RLock() - defer sw.RUnlock() - live := make([]cid.Cid, 0, len(sw.liveWants)) for c := range sw.liveWants { live = append(live, c) @@ -148,16 +122,12 @@ func (sw *sessionWants) LiveWants() []cid.Cid { } func (sw *sessionWants) RandomLiveWant() cid.Cid { - i := rand.Uint64() - - sw.RLock() - defer sw.RUnlock() - if len(sw.liveWants) == 0 { return cid.Cid{} } - i %= uint64(len(sw.liveWants)) + // picking a random live want + i := rand.Intn(len(sw.liveWants)) for k := range sw.liveWants { if i == 0 { return k @@ -169,13 +139,11 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid { // Has live wants indicates if there are any live wants func (sw *sessionWants) HasLiveWants() bool { - sw.RLock() - defer sw.RUnlock() - return len(sw.liveWants) > 0 } -func (sw *sessionWants) unlockedIsWanted(c cid.Cid) bool { +// Indicates whether the want is in either of the fetch or live queues +func (sw *sessionWants) isWanted(c cid.Cid) bool { _, ok := sw.liveWants[c] if !ok { ok = sw.toFetch.Has(c) diff --git a/internal/session/sessionwantsender.go b/internal/session/sessionwantsender.go index 38c62352..cffb39bb 100644 --- a/internal/session/sessionwantsender.go +++ b/internal/session/sessionwantsender.go @@ -48,11 +48,9 @@ type peerAvailability struct { available bool } -// change can be a new peer being discovered, a new message received by the -// session, or a change in the connect status of a peer +// change can be new wants, a new message received by the session, +// or a change in the connect status of a peer type change struct { - // the peer ID of a new peer - addPeer peer.ID // new wants requested add []cid.Cid // new message received by session (blocks / HAVEs / DONT_HAVEs) @@ -85,12 +83,12 @@ type sessionWantSender struct { peerConsecutiveDontHaves map[peer.ID]int // Tracks which peers we have send want-block to swbt *sentWantBlocksTracker - // Maintains a list of peers and whether they are connected - peerAvlMgr *peerAvailabilityManager // Tracks the number of blocks each peer sent us peerRspTrkr *peerResponseTracker // Sends wants to peers pm PeerManager + // Keeps track of peers in the session + spm SessionPeerManager // Keeps track of which peer has / doesn't have a block bpm *bsbpm.BlockPresenceManager // Called when wants are sent @@ -99,105 +97,94 @@ type sessionWantSender struct { onPeersExhausted onPeersExhaustedFn } -func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm *bsbpm.BlockPresenceManager, - onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender { +func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm SessionPeerManager, + bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender { - spm := sessionWantSender{ + sws := sessionWantSender{ ctx: ctx, sessionID: sid, changes: make(chan change, changesBufferSize), wants: make(map[cid.Cid]*wantInfo), peerConsecutiveDontHaves: make(map[peer.ID]int), swbt: newSentWantBlocksTracker(), - peerAvlMgr: newPeerAvailabilityManager(), peerRspTrkr: newPeerResponseTracker(), pm: pm, + spm: spm, bpm: bpm, onSend: onSend, onPeersExhausted: onPeersExhausted, } - return spm + return sws } -func (spm *sessionWantSender) ID() uint64 { - return spm.sessionID +func (sws *sessionWantSender) ID() uint64 { + return sws.sessionID } // Add is called when new wants are added to the session -func (spm *sessionWantSender) Add(ks []cid.Cid) { +func (sws *sessionWantSender) Add(ks []cid.Cid) { if len(ks) == 0 { return } - spm.addChange(change{add: ks}) + sws.addChange(change{add: ks}) } // Update is called when the session receives a message with incoming blocks // or HAVE / DONT_HAVE -func (spm *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid, isNewPeer bool) { - // fmt.Printf("Update(%s, %d, %d, %d, %t)\n", lu.P(from), len(ks), len(haves), len(dontHaves), isNewPeer) +func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { + // fmt.Printf("Update(%s, %d, %d, %d, %t)\n", lu.P(from), len(ks), len(haves), len(dontHaves)) hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0 - if !hasUpdate && !isNewPeer { + if !hasUpdate { return } - ch := change{} - - if hasUpdate { - ch.update = update{from, ks, haves, dontHaves} - } - - // If the message came from a new peer register with the peer manager - if isNewPeer { - available := spm.pm.RegisterSession(from, spm) - ch.addPeer = from - ch.availability = peerAvailability{from, available} - } - - spm.addChange(ch) + sws.addChange(change{ + update: update{from, ks, haves, dontHaves}, + }) } // SignalAvailability is called by the PeerManager to signal that a peer has // connected / disconnected -func (spm *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) { +func (sws *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) { // fmt.Printf("SignalAvailability(%s, %t)\n", lu.P(p), isAvailable) availability := peerAvailability{p, isAvailable} - spm.addChange(change{availability: availability}) + sws.addChange(change{availability: availability}) } // Run is the main loop for processing incoming changes -func (spm *sessionWantSender) Run() { +func (sws *sessionWantSender) Run() { for { select { - case ch := <-spm.changes: - spm.onChange([]change{ch}) - case <-spm.ctx.Done(): - spm.shutdown() + case ch := <-sws.changes: + sws.onChange([]change{ch}) + case <-sws.ctx.Done(): + sws.shutdown() return } } } // addChange adds a new change to the queue -func (spm *sessionWantSender) addChange(c change) { +func (sws *sessionWantSender) addChange(c change) { select { - case spm.changes <- c: - case <-spm.ctx.Done(): + case sws.changes <- c: + case <-sws.ctx.Done(): } } // shutdown unregisters the session with the PeerManager -func (spm *sessionWantSender) shutdown() { - spm.pm.UnregisterSession(spm.sessionID) +func (sws *sessionWantSender) shutdown() { + sws.pm.UnregisterSession(sws.sessionID) } // collectChanges collects all the changes that have occurred since the last // invocation of onChange -func (spm *sessionWantSender) collectChanges(changes []change) []change { +func (sws *sessionWantSender) collectChanges(changes []change) []change { for len(changes) < changesBufferSize { select { - case next := <-spm.changes: + case next := <-sws.changes: changes = append(changes, next) default: return changes @@ -207,27 +194,28 @@ func (spm *sessionWantSender) collectChanges(changes []change) []change { } // onChange processes the next set of changes -func (spm *sessionWantSender) onChange(changes []change) { +func (sws *sessionWantSender) onChange(changes []change) { // Several changes may have been recorded since the last time we checked, // so pop all outstanding changes from the channel - changes = spm.collectChanges(changes) + changes = sws.collectChanges(changes) // Apply each change availability := make(map[peer.ID]bool, len(changes)) var updates []update for _, chng := range changes { - // Add newly discovered peers - if chng.addPeer != "" { - spm.peerAvlMgr.addPeer(chng.addPeer) - } - // Initialize info for new wants for _, c := range chng.add { - spm.trackWant(c) + sws.trackWant(c) } // Consolidate updates and changes to availability if chng.update.from != "" { + // If the update includes blocks or haves, treat it as signaling that + // the peer is available + if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 { + availability[chng.update.from] = true + } + updates = append(updates, chng.update) } if chng.availability.target != "" { @@ -236,20 +224,20 @@ func (spm *sessionWantSender) onChange(changes []change) { } // Update peer availability - newlyAvailable, newlyUnavailable := spm.processAvailability(availability) + newlyAvailable, newlyUnavailable := sws.processAvailability(availability) // Update wants - dontHaves := spm.processUpdates(updates) + dontHaves := sws.processUpdates(updates) // Check if there are any wants for which all peers have indicated they // don't have the want - spm.checkForExhaustedWants(dontHaves, newlyUnavailable) + sws.checkForExhaustedWants(dontHaves, newlyUnavailable) // If there are some connected peers, send any pending wants - if spm.peerAvlMgr.haveAvailablePeers() { + if sws.spm.HasPeers() { // fmt.Printf("sendNextWants()\n") - spm.sendNextWants(newlyAvailable) - // fmt.Println(spm) + sws.sendNextWants(newlyAvailable) + // fmt.Println(sws) } } @@ -258,60 +246,58 @@ func (spm *sessionWantSender) onChange(changes []change) { // It returns the peers that have become // - newly available // - newly unavailable -func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool) (avail []peer.ID, unavail []peer.ID) { +func (sws *sessionWantSender) processAvailability(availability map[peer.ID]bool) (avail []peer.ID, unavail []peer.ID) { var newlyAvailable []peer.ID var newlyUnavailable []peer.ID for p, isNowAvailable := range availability { - // Make sure this is a peer that the session is actually interested in - if wasAvailable, ok := spm.peerAvlMgr.isAvailable(p); ok { - // If the state has changed - if wasAvailable != isNowAvailable { - // Update the state and record that something changed - spm.peerAvlMgr.setPeerAvailability(p, isNowAvailable) - // fmt.Printf("processAvailability change %s %t\n", lu.P(p), isNowAvailable) - spm.updateWantsPeerAvailability(p, isNowAvailable) - if isNowAvailable { - newlyAvailable = append(newlyAvailable, p) - } else { - newlyUnavailable = append(newlyUnavailable, p) - } - // Reset the count of consecutive DONT_HAVEs received from the - // peer - delete(spm.peerConsecutiveDontHaves, p) + stateChange := false + if isNowAvailable { + isNewPeer := sws.spm.AddPeer(p) + if isNewPeer { + stateChange = true + newlyAvailable = append(newlyAvailable, p) + } + } else { + wasAvailable := sws.spm.RemovePeer(p) + if wasAvailable { + stateChange = true + newlyUnavailable = append(newlyUnavailable, p) } } + + // If the state has changed + if stateChange { + sws.updateWantsPeerAvailability(p, isNowAvailable) + // Reset the count of consecutive DONT_HAVEs received from the + // peer + delete(sws.peerConsecutiveDontHaves, p) + } } return newlyAvailable, newlyUnavailable } -// isAvailable indicates whether the peer is available and whether -// it's been tracked by the Session (used by the tests) -func (spm *sessionWantSender) isAvailable(p peer.ID) (bool, bool) { - return spm.peerAvlMgr.isAvailable(p) -} - // trackWant creates a new entry in the map of CID -> want info -func (spm *sessionWantSender) trackWant(c cid.Cid) { +func (sws *sessionWantSender) trackWant(c cid.Cid) { // fmt.Printf("trackWant %s\n", lu.C(c)) - if _, ok := spm.wants[c]; ok { + if _, ok := sws.wants[c]; ok { return } // Create the want info - wi := newWantInfo(spm.peerRspTrkr) - spm.wants[c] = wi + wi := newWantInfo(sws.peerRspTrkr) + sws.wants[c] = wi // For each available peer, register any information we know about // whether the peer has the block - for _, p := range spm.peerAvlMgr.availablePeers() { - spm.updateWantBlockPresence(c, p) + for _, p := range sws.spm.Peers() { + sws.updateWantBlockPresence(c, p) } } // processUpdates processes incoming blocks and HAVE / DONT_HAVEs. // It returns all DONT_HAVEs. -func (spm *sessionWantSender) processUpdates(updates []update) []cid.Cid { +func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { prunePeers := make(map[peer.ID]struct{}) dontHaves := cid.NewSet() for _, upd := range updates { @@ -325,43 +311,43 @@ func (spm *sessionWantSender) processUpdates(updates []update) []cid.Cid { dontHaves.Add(c) // Update the block presence for the peer - spm.updateWantBlockPresence(c, upd.from) + sws.updateWantBlockPresence(c, upd.from) // Check if the DONT_HAVE is in response to a want-block // (could also be in response to want-have) - if spm.swbt.haveSentWantBlockTo(upd.from, c) { + if sws.swbt.haveSentWantBlockTo(upd.from, c) { // If we were waiting for a response from this peer, clear // sentTo so that we can send the want to another peer - if sentTo, ok := spm.getWantSentTo(c); ok && sentTo == upd.from { - spm.setWantSentTo(c, "") + if sentTo, ok := sws.getWantSentTo(c); ok && sentTo == upd.from { + sws.setWantSentTo(c, "") } } // Track the number of consecutive DONT_HAVEs each peer receives - if spm.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit { + if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit { prunePeers[upd.from] = struct{}{} } else { - spm.peerConsecutiveDontHaves[upd.from]++ + sws.peerConsecutiveDontHaves[upd.from]++ } } // For each HAVE for _, c := range upd.haves { // Update the block presence for the peer - spm.updateWantBlockPresence(c, upd.from) - delete(spm.peerConsecutiveDontHaves, upd.from) + sws.updateWantBlockPresence(c, upd.from) + delete(sws.peerConsecutiveDontHaves, upd.from) } // For each received block for _, c := range upd.ks { // Remove the want - removed := spm.removeWant(c) + removed := sws.removeWant(c) if removed != nil { // Inform the peer tracker that this peer was the first to send // us the block - spm.peerRspTrkr.receivedBlockFrom(upd.from) + sws.peerRspTrkr.receivedBlockFrom(upd.from) } - delete(spm.peerConsecutiveDontHaves, upd.from) + delete(sws.peerConsecutiveDontHaves, upd.from) } } @@ -370,7 +356,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) []cid.Cid { if len(prunePeers) > 0 { go func() { for p := range prunePeers { - spm.SignalAvailability(p, false) + sws.SignalAvailability(p, false) } }() } @@ -380,7 +366,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) []cid.Cid { // checkForExhaustedWants checks if there are any wants for which all peers // have sent a DONT_HAVE. We call these "exhausted" wants. -func (spm *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyUnavailable []peer.ID) { +func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyUnavailable []peer.ID) { // If there are no new DONT_HAVEs, and no peers became unavailable, then // we don't need to check for exhausted wants if len(dontHaves) == 0 && len(newlyUnavailable) == 0 { @@ -394,15 +380,15 @@ func (spm *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // (because it may be the last peer who hadn't sent a DONT_HAVE for a CID) if len(newlyUnavailable) > 0 { // Collect all pending wants - wants = make([]cid.Cid, len(spm.wants)) - for c := range spm.wants { + wants = make([]cid.Cid, len(sws.wants)) + for c := range sws.wants { wants = append(wants, c) } // If the last available peer in the session has become unavailable // then we need to broadcast all pending wants - if len(spm.peerAvlMgr.availablePeers()) == 0 { - spm.processExhaustedWants(wants) + if !sws.spm.HasPeers() { + sws.processExhaustedWants(wants) return } } @@ -410,17 +396,17 @@ func (spm *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // If all available peers for a cid sent a DONT_HAVE, signal to the session // that we've exhausted available peers if len(wants) > 0 { - exhausted := spm.bpm.AllPeersDoNotHaveBlock(spm.peerAvlMgr.availablePeers(), wants) - spm.processExhaustedWants(exhausted) + exhausted := sws.bpm.AllPeersDoNotHaveBlock(sws.spm.Peers(), wants) + sws.processExhaustedWants(exhausted) } } // processExhaustedWants filters the list so that only those wants that haven't // already been marked as exhausted are passed to onPeersExhausted() -func (spm *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) { - newlyExhausted := spm.newlyExhausted(exhausted) +func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) { + newlyExhausted := sws.newlyExhausted(exhausted) if len(newlyExhausted) > 0 { - spm.onPeersExhausted(newlyExhausted) + sws.onPeersExhausted(newlyExhausted) } } @@ -444,10 +430,10 @@ func (aw allWants) forPeer(p peer.ID) *wantSets { // sendNextWants sends wants to peers according to the latest information // about which peers have / dont have blocks -func (spm *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { +func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { toSend := make(allWants) - for c, wi := range spm.wants { + for c, wi := range sws.wants { // Ensure we send want-haves to any newly available peers for _, p := range newlyAvailable { toSend.forPeer(p).wantHaves.Add(c) @@ -471,13 +457,13 @@ func (spm *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { // fmt.Printf(" q - send best: %s: %s\n", lu.C(c), lu.P(wi.bestPeer)) // Record that we are sending a want-block for this want to the peer - spm.setWantSentTo(c, wi.bestPeer) + sws.setWantSentTo(c, wi.bestPeer) // Send a want-block to the chosen peer toSend.forPeer(wi.bestPeer).wantBlocks.Add(c) // Send a want-have to each other peer - for _, op := range spm.peerAvlMgr.availablePeers() { + for _, op := range sws.spm.Peers() { if op != wi.bestPeer { toSend.forPeer(op).wantHaves.Add(c) } @@ -485,11 +471,11 @@ func (spm *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { } // Send any wants we've collected - spm.sendWants(toSend) + sws.sendWants(toSend) } // sendWants sends want-have and want-blocks to the appropriate peers -func (spm *sessionWantSender) sendWants(sends allWants) { +func (sws *sessionWantSender) sendWants(sends allWants) { // fmt.Printf(" send wants to %d peers\n", len(sends)) // For each peer we're sending a request to @@ -497,7 +483,7 @@ func (spm *sessionWantSender) sendWants(sends allWants) { // fmt.Printf(" send %d wants to %s\n", snd.wantBlocks.Len(), lu.P(p)) // Piggyback some other want-haves onto the request to the peer - for _, c := range spm.getPiggybackWantHaves(p, snd.wantBlocks) { + for _, c := range sws.getPiggybackWantHaves(p, snd.wantBlocks) { snd.wantHaves.Add(c) } @@ -507,24 +493,24 @@ func (spm *sessionWantSender) sendWants(sends allWants) { // precedence over want-haves. wblks := snd.wantBlocks.Keys() whaves := snd.wantHaves.Keys() - spm.pm.SendWants(spm.ctx, p, wblks, whaves) + sws.pm.SendWants(sws.ctx, p, wblks, whaves) // Inform the session that we've sent the wants - spm.onSend(p, wblks, whaves) + sws.onSend(p, wblks, whaves) // Record which peers we send want-block to - spm.swbt.addSentWantBlocksTo(p, wblks) + sws.swbt.addSentWantBlocksTo(p, wblks) } } // getPiggybackWantHaves gets the want-haves that should be piggybacked onto // a request that we are making to send want-blocks to a peer -func (spm *sessionWantSender) getPiggybackWantHaves(p peer.ID, wantBlocks *cid.Set) []cid.Cid { +func (sws *sessionWantSender) getPiggybackWantHaves(p peer.ID, wantBlocks *cid.Set) []cid.Cid { var whs []cid.Cid - for c := range spm.wants { + for c := range sws.wants { // Don't send want-have if we're already sending a want-block // (or have previously) - if !wantBlocks.Has(c) && !spm.swbt.haveSentWantBlockTo(p, c) { + if !wantBlocks.Has(c) && !sws.swbt.haveSentWantBlockTo(p, c) { whs = append(whs, c) } } @@ -533,10 +519,10 @@ func (spm *sessionWantSender) getPiggybackWantHaves(p peer.ID, wantBlocks *cid.S // newlyExhausted filters the list of keys for wants that have not already // been marked as exhausted (all peers indicated they don't have the block) -func (spm *sessionWantSender) newlyExhausted(ks []cid.Cid) []cid.Cid { +func (sws *sessionWantSender) newlyExhausted(ks []cid.Cid) []cid.Cid { var res []cid.Cid for _, c := range ks { - if wi, ok := spm.wants[c]; ok { + if wi, ok := sws.wants[c]; ok { if !wi.exhausted { res = append(res, c) wi.exhausted = true @@ -547,9 +533,9 @@ func (spm *sessionWantSender) newlyExhausted(ks []cid.Cid) []cid.Cid { } // removeWant is called when the corresponding block is received -func (spm *sessionWantSender) removeWant(c cid.Cid) *wantInfo { - if wi, ok := spm.wants[c]; ok { - delete(spm.wants, c) +func (sws *sessionWantSender) removeWant(c cid.Cid) *wantInfo { + if wi, ok := sws.wants[c]; ok { + delete(sws.wants, c) return wi } return nil @@ -557,10 +543,10 @@ func (spm *sessionWantSender) removeWant(c cid.Cid) *wantInfo { // updateWantsPeerAvailability is called when the availability changes for a // peer. It updates all the wants accordingly. -func (spm *sessionWantSender) updateWantsPeerAvailability(p peer.ID, isNowAvailable bool) { - for c, wi := range spm.wants { +func (sws *sessionWantSender) updateWantsPeerAvailability(p peer.ID, isNowAvailable bool) { + for c, wi := range sws.wants { if isNowAvailable { - spm.updateWantBlockPresence(c, p) + sws.updateWantBlockPresence(c, p) } else { wi.removePeer(p) } @@ -569,17 +555,17 @@ func (spm *sessionWantSender) updateWantsPeerAvailability(p peer.ID, isNowAvaila // updateWantBlockPresence is called when a HAVE / DONT_HAVE is received for the given // want / peer -func (spm *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) { - wi, ok := spm.wants[c] +func (sws *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) { + wi, ok := sws.wants[c] if !ok { return } // If the peer sent us a HAVE or DONT_HAVE for the cid, adjust the // block presence for the peer / cid combination - if spm.bpm.PeerHasBlock(p, c) { + if sws.bpm.PeerHasBlock(p, c) { wi.setPeerBlockPresence(p, BPHave) - } else if spm.bpm.PeerDoesNotHaveBlock(p, c) { + } else if sws.bpm.PeerDoesNotHaveBlock(p, c) { wi.setPeerBlockPresence(p, BPDontHave) } else { wi.setPeerBlockPresence(p, BPUnknown) @@ -587,16 +573,16 @@ func (spm *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) { } // Which peer was the want sent to -func (spm *sessionWantSender) getWantSentTo(c cid.Cid) (peer.ID, bool) { - if wi, ok := spm.wants[c]; ok { +func (sws *sessionWantSender) getWantSentTo(c cid.Cid) (peer.ID, bool) { + if wi, ok := sws.wants[c]; ok { return wi.sentTo, true } return "", false } // Record which peer the want was sent to -func (spm *sessionWantSender) setWantSentTo(c cid.Cid, p peer.ID) { - if wi, ok := spm.wants[c]; ok { +func (sws *sessionWantSender) setWantSentTo(c cid.Cid, p peer.ID) { + if wi, ok := sws.wants[c]; ok { wi.sentTo = p } } diff --git a/internal/session/sessionwantsender_test.go b/internal/session/sessionwantsender_test.go index ecea497b..ef7da73c 100644 --- a/internal/session/sessionwantsender_test.go +++ b/internal/session/sessionwantsender_test.go @@ -72,10 +72,11 @@ func TestSendWants(t *testing.T) { peerA := peers[0] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -83,7 +84,7 @@ func TestSendWants(t *testing.T) { blkCids0 := cids[0:2] spm.Add(blkCids0) // peerA: HAVE cid0 - spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends := pm.waitNextWants() @@ -109,10 +110,11 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) { peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -120,7 +122,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) { blkCids0 := cids[0:2] spm.Add(blkCids0) // peerA: HAVE cid0 - spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends := pm.waitNextWants() @@ -139,7 +141,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) { pm.clearWants() // peerB: HAVE cid0 - spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends = pm.waitNextWants() @@ -166,17 +168,18 @@ func TestReceiveBlock(t *testing.T) { peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() // add cid0, cid1 spm.Add(cids) // peerA: HAVE cid0 - spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends := pm.waitNextWants() @@ -196,10 +199,10 @@ func TestReceiveBlock(t *testing.T) { // peerA: block cid0, DONT_HAVE cid1 bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]}) - spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{cids[1]}, false) + spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{cids[1]}) // peerB: HAVE cid0, cid1 bpm.ReceiveFrom(peerB, cids, []cid.Cid{}) - spm.Update(peerB, []cid.Cid{}, cids, []cid.Cid{}, true) + spm.Update(peerB, []cid.Cid{}, cids, []cid.Cid{}) // Wait for processing to complete peerSends = pm.waitNextWants() @@ -225,17 +228,18 @@ func TestPeerUnavailable(t *testing.T) { peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() // add cid0, cid1 spm.Add(cids) // peerA: HAVE cid0 - spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends := pm.waitNextWants() @@ -254,7 +258,7 @@ func TestPeerUnavailable(t *testing.T) { pm.clearWants() // peerB: HAVE cid0 - spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // Wait for processing to complete peerSends = pm.waitNextWants() @@ -283,12 +287,13 @@ func TestPeerUnavailable(t *testing.T) { } func TestPeersExhausted(t *testing.T) { - cids := testutil.GenerateCids(2) + cids := testutil.GenerateCids(3) peers := testutil.GeneratePeers(2) peerA := peers[0] peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} @@ -296,53 +301,62 @@ func TestPeersExhausted(t *testing.T) { onPeersExhausted := func(ks []cid.Cid) { exhausted = append(exhausted, ks...) } - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() // add cid0, cid1 spm.Add(cids) - // peerA: DONT_HAVE cid0 - bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[0]}) + // peerA: HAVE cid0 + bpm.ReceiveFrom(peerA, []cid.Cid{cids[0]}, []cid.Cid{}) // Note: this also registers peer A as being available - spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]}, true) + spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{}) + + // peerA: DONT_HAVE cid1 + bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]}) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1]}) time.Sleep(5 * time.Millisecond) - // All available peers (peer A) have sent us a DONT_HAVE for cid0, - // so expect that onPeersExhausted() will be called with cid0 - if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[0]}) { + // All available peers (peer A) have sent us a DONT_HAVE for cid1, + // so expect that onPeersExhausted() will be called with cid1 + if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) { t.Fatal("Wrong keys") } // Clear exhausted cids exhausted = []cid.Cid{} - // peerB: DONT_HAVE cid0, cid1 - bpm.ReceiveFrom(peerB, []cid.Cid{}, cids) - spm.Update(peerB, []cid.Cid{}, []cid.Cid{}, cids, true) + // peerB: HAVE cid0 + bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{}) + // Note: this also registers peer B as being available + spm.Update(peerB, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{}) + + // peerB: DONT_HAVE cid1, cid2 + bpm.ReceiveFrom(peerB, []cid.Cid{}, []cid.Cid{cids[1], cids[2]}) + spm.Update(peerB, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1], cids[2]}) // Wait for processing to complete pm.waitNextWants() // All available peers (peer A and peer B) have sent us a DONT_HAVE - // for cid0, but we already called onPeersExhausted with cid0, so it + // for cid1, but we already called onPeersExhausted with cid1, so it // should not be called again if len(exhausted) > 0 { t.Fatal("Wrong keys") } - // peerA: DONT_HAVE cid1 - bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]}) - spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1]}, false) + // peerA: DONT_HAVE cid2 + bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[2]}) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[2]}) // Wait for processing to complete pm.waitNextWants() // All available peers (peer A and peer B) have sent us a DONT_HAVE for - // cid1, so expect that onPeersExhausted() will be called with cid1 - if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) { + // cid2, so expect that onPeersExhausted() will be called with cid2 + if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[2]}) { t.Fatal("Wrong keys") } } @@ -358,6 +372,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) { peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} @@ -365,7 +380,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) { onPeersExhausted := func(ks []cid.Cid) { exhausted = append(exhausted, ks...) } - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -375,15 +390,15 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) { // peerA: HAVE cid0 bpm.ReceiveFrom(peerA, []cid.Cid{cids[0]}, []cid.Cid{}) // Note: this also registers peer A as being available - spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // peerB: HAVE cid0 bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{}) // Note: this also registers peer B as being available - spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) // peerA: DONT_HAVE cid1 bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]}) - spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]}, false) + spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]}) time.Sleep(5 * time.Millisecond) @@ -408,6 +423,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) { peerB := peers[1] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} @@ -415,7 +431,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) { onPeersExhausted := func(ks []cid.Cid) { exhausted = append(exhausted, ks...) } - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -423,11 +439,11 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) { spm.Add(cids) // peerA: receive block for cid0 (and register peer A with sessionWantSender) - spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{}, true) + spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{}) // peerB: HAVE cid1 bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{}) // Note: this also registers peer B as being available - spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true) + spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}) time.Sleep(5 * time.Millisecond) @@ -449,10 +465,11 @@ func TestConsecutiveDontHaveLimit(t *testing.T) { p := testutil.GeneratePeers(1)[0] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -461,41 +478,41 @@ func TestConsecutiveDontHaveLimit(t *testing.T) { // Receive a HAVE from peer (adds it to the session) bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true) + spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } // Receive DONT_HAVEs from peer that do not exceed limit for _, c := range cids[1:peerDontHaveLimit] { bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } // Receive DONT_HAVEs from peer that exceed limit for _, c := range cids[peerDontHaveLimit:] { bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Session should remove peer - if avail, _ := spm.isAvailable(p); avail { + if has := fpm.HasPeer(p); has { t.Fatal("Expected peer not to be available") } } @@ -505,10 +522,11 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { p := testutil.GeneratePeers(1)[0] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -517,13 +535,13 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { // Receive a HAVE from peer (adds it to the session) bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true) + spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } @@ -533,24 +551,24 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { for _, c := range cids[1:peerDontHaveLimit] { // DONT_HAVEs bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] { // HAVEs bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{}) - spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}) } for _, c := range cids[peerDontHaveLimit+1:] { // DONT_HAVEs bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } } @@ -560,10 +578,11 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { p := testutil.GeneratePeers(1)[0] sid := uint64(1) pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) go spm.Run() @@ -572,39 +591,39 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { // Receive a HAVE from peer (adds it to the session) bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true) + spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } // Receive DONT_HAVEs from peer that exceed limit for _, c := range cids[1 : peerDontHaveLimit+2] { bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Session should remove peer - if avail, _ := spm.isAvailable(p); avail { + if has := fpm.HasPeer(p); has { t.Fatal("Expected peer not to be available") } // Receive a HAVE from peer (adds it back into the session) bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true) + spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } @@ -613,28 +632,28 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { // Receive DONT_HAVEs from peer that don't exceed limit for _, c := range cids2[1:peerDontHaveLimit] { bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Peer should be available - if avail, ok := spm.isAvailable(p); !ok || !avail { + if has := fpm.HasPeer(p); !has { t.Fatal("Expected peer to be available") } // Receive DONT_HAVEs from peer that exceed limit for _, c := range cids2[peerDontHaveLimit:] { bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) - spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) } // Wait for processing to complete time.Sleep(5 * time.Millisecond) // Session should remove peer - if avail, _ := spm.isAvailable(p); avail { + if has := fpm.HasPeer(p); has { t.Fatal("Expected peer not to be available") } } diff --git a/internal/sessionmanager/sessionmanager_test.go b/internal/sessionmanager/sessionmanager_test.go index e89ea464..4e0152bb 100644 --- a/internal/sessionmanager/sessionmanager_test.go +++ b/internal/sessionmanager/sessionmanager_test.go @@ -45,12 +45,12 @@ func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid type fakeSesPeerManager struct { } -func (*fakeSesPeerManager) ReceiveFrom(peer.ID, []cid.Cid, []cid.Cid) bool { return true } -func (*fakeSesPeerManager) Peers() *peer.Set { return nil } -func (*fakeSesPeerManager) FindMorePeers(context.Context, cid.Cid) {} -func (*fakeSesPeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {} -func (*fakeSesPeerManager) RecordPeerResponse(peer.ID, []cid.Cid) {} -func (*fakeSesPeerManager) RecordCancels(c []cid.Cid) {} +func (*fakeSesPeerManager) Peers() []peer.ID { return nil } +func (*fakeSesPeerManager) PeersDiscovered() bool { return false } +func (*fakeSesPeerManager) Shutdown() {} +func (*fakeSesPeerManager) AddPeer(peer.ID) bool { return false } +func (*fakeSesPeerManager) RemovePeer(peer.ID) bool { return false } +func (*fakeSesPeerManager) HasPeers() bool { return false } type fakePeerManager struct { } diff --git a/internal/sessionpeermanager/latencytracker.go b/internal/sessionpeermanager/latencytracker.go deleted file mode 100644 index 326d2fa4..00000000 --- a/internal/sessionpeermanager/latencytracker.go +++ /dev/null @@ -1,77 +0,0 @@ -package sessionpeermanager - -import ( - "time" - - "github.com/ipfs/go-cid" -) - -type requestData struct { - startedAt time.Time - wasCancelled bool - timeoutFunc *time.Timer -} - -type latencyTracker struct { - requests map[cid.Cid]*requestData -} - -func newLatencyTracker() *latencyTracker { - return &latencyTracker{requests: make(map[cid.Cid]*requestData)} -} - -type afterTimeoutFunc func(cid.Cid) - -func (lt *latencyTracker) SetupRequests(keys []cid.Cid, timeoutDuration time.Duration, afterTimeout afterTimeoutFunc) { - startedAt := time.Now() - for _, k := range keys { - if _, ok := lt.requests[k]; !ok { - lt.requests[k] = &requestData{ - startedAt, - false, - time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k)), - } - } - } -} - -func makeAfterTimeout(afterTimeout afterTimeoutFunc, k cid.Cid) func() { - return func() { afterTimeout(k) } -} - -func (lt *latencyTracker) CheckDuration(key cid.Cid) (time.Duration, bool) { - request, ok := lt.requests[key] - var latency time.Duration - if ok { - latency = time.Since(request.startedAt) - } - return latency, ok -} - -func (lt *latencyTracker) RemoveRequest(key cid.Cid) { - request, ok := lt.requests[key] - if ok { - request.timeoutFunc.Stop() - delete(lt.requests, key) - } -} - -func (lt *latencyTracker) RecordCancel(keys []cid.Cid) { - for _, key := range keys { - request, ok := lt.requests[key] - if ok { - request.wasCancelled = true - } - } -} - -func (lt *latencyTracker) WasCancelled(key cid.Cid) bool { - request, ok := lt.requests[key] - return ok && request.wasCancelled -} - -func (lt *latencyTracker) Shutdown() { - for _, request := range lt.requests { - request.timeoutFunc.Stop() - } -} diff --git a/internal/sessionpeermanager/peerdata.go b/internal/sessionpeermanager/peerdata.go deleted file mode 100644 index a0619858..00000000 --- a/internal/sessionpeermanager/peerdata.go +++ /dev/null @@ -1,41 +0,0 @@ -package sessionpeermanager - -import ( - "time" - - "github.com/ipfs/go-cid" -) - -const ( - newLatencyWeight = 0.5 -) - -type peerData struct { - hasLatency bool - latency time.Duration - lt *latencyTracker -} - -func newPeerData() *peerData { - return &peerData{ - hasLatency: false, - lt: newLatencyTracker(), - latency: 0, - } -} - -func (pd *peerData) AdjustLatency(k cid.Cid, hasFallbackLatency bool, fallbackLatency time.Duration) { - latency, hasLatency := pd.lt.CheckDuration(k) - pd.lt.RemoveRequest(k) - if !hasLatency { - latency, hasLatency = fallbackLatency, hasFallbackLatency - } - if hasLatency { - if pd.hasLatency { - pd.latency = time.Duration(float64(pd.latency)*(1.0-newLatencyWeight) + float64(latency)*newLatencyWeight) - } else { - pd.latency = latency - pd.hasLatency = true - } - } -} diff --git a/internal/sessionpeermanager/sessionpeermanager.go b/internal/sessionpeermanager/sessionpeermanager.go index 7957638d..cc6e7110 100644 --- a/internal/sessionpeermanager/sessionpeermanager.go +++ b/internal/sessionpeermanager/sessionpeermanager.go @@ -1,26 +1,20 @@ package sessionpeermanager import ( - "context" "fmt" - "math/rand" - "sort" - "time" + "sync" - bssd "github.com/ipfs/go-bitswap/internal/sessiondata" logging "github.com/ipfs/go-log" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) var log = logging.Logger("bs:sprmgr") const ( - defaultTimeoutDuration = 5 * time.Second - maxOptimizedPeers = 32 - unoptimizedTagValue = 5 // tag value for "unoptimized" session peers. - optimizedTagValue = 10 // tag value for "optimized" session peers. + // Connection Manager tag value for session peers. Indicates to connection + // manager that it should keep the connection to the peer. + sessionPeerTagValue = 5 ) // PeerTagger is an interface for tagging peers with metadata @@ -29,362 +23,108 @@ type PeerTagger interface { UntagPeer(p peer.ID, tag string) } -// PeerProviderFinder is an interface for finding providers -type PeerProviderFinder interface { - FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID -} - -type peerMessage interface { - handle(spm *SessionPeerManager) -} - -// SessionPeerManager tracks and manages peers for a session, and provides -// the best ones to the session +// SessionPeerManager keeps track of peers for a session, and takes care of +// ConnectionManager tagging. type SessionPeerManager struct { - ctx context.Context - tagger PeerTagger - providerFinder PeerProviderFinder - peers *peer.Set - tag string - id uint64 - - peerMessages chan peerMessage + tagger PeerTagger + tag string - // do not touch outside of run loop - activePeers map[peer.ID]*peerData - unoptimizedPeersArr []peer.ID - optimizedPeersArr []peer.ID - broadcastLatency *latencyTracker - timeoutDuration time.Duration + plk sync.RWMutex + peers map[peer.ID]struct{} + peersDiscovered bool } // New creates a new SessionPeerManager -func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager { - spm := &SessionPeerManager{ - ctx: ctx, - id: id, - tagger: tagger, - providerFinder: providerFinder, - peers: peer.NewSet(), - peerMessages: make(chan peerMessage, 128), - activePeers: make(map[peer.ID]*peerData), - broadcastLatency: newLatencyTracker(), - timeoutDuration: defaultTimeoutDuration, +func New(id uint64, tagger PeerTagger) *SessionPeerManager { + return &SessionPeerManager{ + tag: fmt.Sprint("bs-ses-", id), + tagger: tagger, + peers: make(map[peer.ID]struct{}), } - - spm.tag = fmt.Sprint("bs-ses-", id) - - go spm.run(ctx) - return spm } -func (spm *SessionPeerManager) ReceiveFrom(p peer.ID, ks []cid.Cid, haves []cid.Cid) bool { - if len(ks) > 0 || len(haves) > 0 && !spm.peers.Contains(p) { - log.Infof("Added peer %s to session: %d peers\n", p, spm.peers.Size()) - spm.peers.Add(p) - return true - } - return false -} +// AddPeer adds the peer to the SessionPeerManager. +// Returns true if the peer is a new peer, false if it already existed. +func (spm *SessionPeerManager) AddPeer(p peer.ID) bool { + spm.plk.Lock() + defer spm.plk.Unlock() -func (spm *SessionPeerManager) Peers() *peer.Set { - return spm.peers -} - -// RecordPeerResponse records that a peer received some blocks, and adds the -// peer to the list of peers if it wasn't already added -func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, ks []cid.Cid) { - - select { - case spm.peerMessages <- &peerResponseMessage{p, ks}: - case <-spm.ctx.Done(): + // Check if the peer is a new peer + if _, ok := spm.peers[p]; ok { + return false } -} -// RecordCancels records the fact that cancellations were sent to peers, -// so if blocks don't arrive, don't let it affect the peer's timeout -func (spm *SessionPeerManager) RecordCancels(ks []cid.Cid) { - select { - case spm.peerMessages <- &cancelMessage{ks}: - case <-spm.ctx.Done(): - } -} + spm.peers[p] = struct{}{} + spm.peersDiscovered = true -// RecordPeerRequests records that a given set of peers requested the given cids. -func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) { - select { - case spm.peerMessages <- &peerRequestMessage{p, ks}: - case <-spm.ctx.Done(): - } -} - -// GetOptimizedPeers returns the best peers available for a session, along with -// a rating for how good they are, in comparison to the best peer. -func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { - // right now this just returns all peers, but soon we might return peers - // ordered by optimization, or only a subset - resp := make(chan []bssd.OptimizedPeer, 1) - select { - case spm.peerMessages <- &getPeersMessage{resp}: - case <-spm.ctx.Done(): - return nil - } + // Tag the peer with the ConnectionManager so it doesn't discard the + // connection + spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue) - select { - case peers := <-resp: - return peers - case <-spm.ctx.Done(): - return nil - } + log.Infof("Added peer %s to session: %d peers\n", p, len(spm.peers)) + return true } -// FindMorePeers attempts to find more peers for a session by searching for -// providers for the given Cid -func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { - go func(k cid.Cid) { - for p := range spm.providerFinder.FindProvidersAsync(ctx, k) { - - select { - case spm.peerMessages <- &peerFoundMessage{p}: - case <-ctx.Done(): - case <-spm.ctx.Done(): - } - } - }(c) -} +// RemovePeer removes the peer from the SessionPeerManager. +// Returns true if the peer was removed, false if it did not exist. +func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool { + spm.plk.Lock() + defer spm.plk.Unlock() -// SetTimeoutDuration changes the length of time used to timeout recording of -// requests -func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) { - select { - case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}: - case <-spm.ctx.Done(): + if _, ok := spm.peers[p]; !ok { + return false } -} -func (spm *SessionPeerManager) run(ctx context.Context) { - for { - select { - case pm := <-spm.peerMessages: - pm.handle(spm) - case <-ctx.Done(): - spm.handleShutdown() - return - } - } + delete(spm.peers, p) + spm.tagger.UntagPeer(p, spm.tag) + return true } -func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) { - var value int - if data.hasLatency { - value = optimizedTagValue - } else { - value = unoptimizedTagValue - } - spm.tagger.TagPeer(p, spm.tag, value) -} - -func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) { - if data.hasLatency { - insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool { - return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency - }) - spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos], - append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...) - } else { - spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p) - } +// PeersDiscovered indicates whether peers have been discovered yet. +// Returns true once a peer has been discovered by the session (even if all +// peers are later removed from the session). +func (spm *SessionPeerManager) PeersDiscovered() bool { + spm.plk.RLock() + defer spm.plk.RUnlock() - if !spm.peers.Contains(p) { - log.Infof("Added peer %s to session: %d peers\n", p, spm.peers.Size()) - spm.peers.Add(p) - } + return spm.peersDiscovered } -func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) { - for i := 0; i < len(spm.optimizedPeersArr); i++ { - if spm.optimizedPeersArr[i] == p { - spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...) - return - } - } -} +func (spm *SessionPeerManager) Peers() []peer.ID { + spm.plk.RLock() + defer spm.plk.RUnlock() -func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) { - for i := 0; i < len(spm.unoptimizedPeersArr); i++ { - if spm.unoptimizedPeersArr[i] == p { - spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1] - spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1] - return - } + peers := make([]peer.ID, 0, len(spm.peers)) + for p := range spm.peers { + peers = append(peers, p) } -} -func (spm *SessionPeerManager) recordResponse(p peer.ID, ks []cid.Cid) { - data, ok := spm.activePeers[p] - wasOptimized := ok && data.hasLatency - if wasOptimized { - spm.removeOptimizedPeer(p) - } else { - if ok { - spm.removeUnoptimizedPeer(p) - } else { - data = newPeerData() - spm.activePeers[p] = data - } - } - for _, k := range ks { - fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k) - data.AdjustLatency(k, hasFallbackLatency, fallbackLatency) - } - if !ok || wasOptimized != data.hasLatency { - spm.tagPeer(p, data) - } - spm.insertPeer(p, data) + return peers } -type peerFoundMessage struct { - p peer.ID -} +func (spm *SessionPeerManager) HasPeers() bool { + spm.plk.RLock() + defer spm.plk.RUnlock() -func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) { - p := pfm.p - if _, ok := spm.activePeers[p]; !ok { - spm.activePeers[p] = newPeerData() - spm.insertPeer(p, spm.activePeers[p]) - spm.tagPeer(p, spm.activePeers[p]) - } + return len(spm.peers) > 0 } -type peerResponseMessage struct { - p peer.ID - ks []cid.Cid -} - -func (prm *peerResponseMessage) handle(spm *SessionPeerManager) { - spm.recordResponse(prm.p, prm.ks) -} - -type peerRequestMessage struct { - peers []peer.ID - keys []cid.Cid -} - -func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc { - return func(k cid.Cid) { - select { - case spm.peerMessages <- &peerTimeoutMessage{p, k}: - case <-spm.ctx.Done(): - } - } -} - -func (prm *peerRequestMessage) handle(spm *SessionPeerManager) { - if prm.peers == nil { - spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) { - select { - case spm.peerMessages <- &broadcastTimeoutMessage{k}: - case <-spm.ctx.Done(): - } - }) - } else { - for _, p := range prm.peers { - if data, ok := spm.activePeers[p]; ok { - data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p)) - } - } - } -} - -type getPeersMessage struct { - resp chan<- []bssd.OptimizedPeer -} - -// Get all optimized peers in order followed by randomly ordered unoptimized -// peers, with a limit of maxOptimizedPeers -func (prm *getPeersMessage) handle(spm *SessionPeerManager) { - randomOrder := rand.Perm(len(spm.unoptimizedPeersArr)) - - // Number of peers to get in total: unoptimized + optimized - // limited by maxOptimizedPeers - maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr) - if maxPeers > maxOptimizedPeers { - maxPeers = maxOptimizedPeers - } - - // The best peer latency is the first optimized peer's latency. - // If we haven't recorded any peer's latency, use 0. - var bestPeerLatency float64 - if len(spm.optimizedPeersArr) > 0 { - bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency) - } else { - bestPeerLatency = 0 - } +func (spm *SessionPeerManager) HasPeer(p peer.ID) bool { + spm.plk.RLock() + defer spm.plk.RUnlock() - optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers) - for i := 0; i < maxPeers; i++ { - // First add optimized peers in order - if i < len(spm.optimizedPeersArr) { - p := spm.optimizedPeersArr[i] - optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{ - Peer: p, - OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency), - }) - } else { - // Then add unoptimized peers in random order - p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]] - optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0}) - } - } - prm.resp <- optimizedPeers -} - -type cancelMessage struct { - ks []cid.Cid + _, ok := spm.peers[p] + return ok } -func (cm *cancelMessage) handle(spm *SessionPeerManager) { - for _, data := range spm.activePeers { - data.lt.RecordCancel(cm.ks) - } -} +// Shutdown untags all the peers +func (spm *SessionPeerManager) Shutdown() { + spm.plk.Lock() + defer spm.plk.Unlock() -func (spm *SessionPeerManager) handleShutdown() { - for p, data := range spm.activePeers { + // Untag the peers with the ConnectionManager so that it can release + // connections to those peers + for p := range spm.peers { spm.tagger.UntagPeer(p, spm.tag) - data.lt.Shutdown() - } -} - -type peerTimeoutMessage struct { - p peer.ID - k cid.Cid -} - -func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) { - data, ok := spm.activePeers[ptm.p] - // If the request was cancelled, make sure we clean up the request tracker - if ok && data.lt.WasCancelled(ptm.k) { - data.lt.RemoveRequest(ptm.k) - } else { - // If the request was not cancelled, record the latency. Note that we - // do this even if we didn't previously know about this peer. - spm.recordResponse(ptm.p, []cid.Cid{ptm.k}) } } - -type broadcastTimeoutMessage struct { - k cid.Cid -} - -func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) { - spm.broadcastLatency.RemoveRequest(btm.k) -} - -type setTimeoutMessage struct { - timeoutDuration time.Duration -} - -func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) { - spm.timeoutDuration = stm.timeoutDuration -} diff --git a/internal/sessionpeermanager/sessionpeermanager_test.go b/internal/sessionpeermanager/sessionpeermanager_test.go index 9a771b18..e3c1c4ab 100644 --- a/internal/sessionpeermanager/sessionpeermanager_test.go +++ b/internal/sessionpeermanager/sessionpeermanager_test.go @@ -1,46 +1,13 @@ package sessionpeermanager import ( - "context" - "fmt" - "math/rand" "sync" "testing" - "time" "github.com/ipfs/go-bitswap/internal/testutil" - - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) -type fakePeerProviderFinder struct { - peers []peer.ID - completed chan struct{} -} - -func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID { - peerCh := make(chan peer.ID) - go func() { - - for _, p := range fppf.peers { - select { - case peerCh <- p: - case <-ctx.Done(): - close(peerCh) - return - } - } - close(peerCh) - - select { - case fppf.completed <- struct{}{}: - case <-ctx.Done(): - } - }() - return peerCh -} - type fakePeerTagger struct { lk sync.Mutex taggedPeers []peer.ID @@ -69,330 +36,192 @@ func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) { } } -func (fpt *fakePeerTagger) count() int { - fpt.lk.Lock() - defer fpt.lk.Unlock() - return len(fpt.taggedPeers) -} +func TestAddPeers(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) -func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID { - optimizedPeers := sessionPeerManager.GetOptimizedPeers() - var peers []peer.ID - for _, optimizedPeer := range optimizedPeers { - peers = append(peers, optimizedPeer.Peer) + isNew := spm.AddPeer(peers[0]) + if !isNew { + t.Fatal("Expected peer to be new") } - return peers -} - -func TestFindingMorePeers(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - completed := make(chan struct{}) - peers := testutil.GeneratePeers(5) - fpt := &fakePeerTagger{} - fppf := &fakePeerProviderFinder{peers, completed} - c := testutil.GenerateCids(1)[0] - id := testutil.GenerateSessionID() - - sessionPeerManager := New(ctx, id, fpt, fppf) - - findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond) - defer findCancel() - sessionPeerManager.FindMorePeers(ctx, c) - select { - case <-completed: - case <-findCtx.Done(): - t.Fatal("Did not finish finding providers") + isNew = spm.AddPeer(peers[0]) + if isNew { + t.Fatal("Expected peer to no longer be new") } - time.Sleep(2 * time.Millisecond) - sessionPeers := getPeers(sessionPeerManager) - if len(sessionPeers) != len(peers) { - t.Fatal("incorrect number of peers found") - } - for _, p := range sessionPeers { - if !testutil.ContainsPeer(peers, p) { - t.Fatal("incorrect peer found through finding providers") - } - } - if len(fpt.taggedPeers) != len(peers) { - t.Fatal("Peers were not tagged!") + isNew = spm.AddPeer(peers[1]) + if !isNew { + t.Fatal("Expected peer to be new") } } -func TestRecordingReceivedBlocks(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - p := testutil.GeneratePeers(1)[0] - fpt := &fakePeerTagger{} - fppf := &fakePeerProviderFinder{} - c := testutil.GenerateCids(1)[0] - id := testutil.GenerateSessionID() - - sessionPeerManager := New(ctx, id, fpt, fppf) - sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c}) - time.Sleep(10 * time.Millisecond) - sessionPeers := getPeers(sessionPeerManager) - if len(sessionPeers) != 1 { - t.Fatal("did not add peer on receive") +func TestRemovePeers(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) + + existed := spm.RemovePeer(peers[0]) + if existed { + t.Fatal("Expected peer not to exist") } - if sessionPeers[0] != p { - t.Fatal("incorrect peer added on receive") + + spm.AddPeer(peers[0]) + spm.AddPeer(peers[1]) + + existed = spm.RemovePeer(peers[0]) + if !existed { + t.Fatal("Expected peer to exist") } - if len(fpt.taggedPeers) != 1 { - t.Fatal("Peers was not tagged!") + existed = spm.RemovePeer(peers[1]) + if !existed { + t.Fatal("Expected peer to exist") + } + existed = spm.RemovePeer(peers[0]) + if existed { + t.Fatal("Expected peer not to have existed") } } -func TestOrderingPeers(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 60*time.Millisecond) - defer cancel() - peerCount := 100 - peers := testutil.GeneratePeers(peerCount) - completed := make(chan struct{}) - fpt := &fakePeerTagger{} - fppf := &fakePeerProviderFinder{peers, completed} - c := testutil.GenerateCids(1) - id := testutil.GenerateSessionID() - sessionPeerManager := New(ctx, id, fpt, fppf) - - // add all peers to session - sessionPeerManager.FindMorePeers(ctx, c[0]) - select { - case <-completed: - case <-ctx.Done(): - t.Fatal("Did not finish finding providers") - } - time.Sleep(5 * time.Millisecond) - - // record broadcast - sessionPeerManager.RecordPeerRequests(nil, c) - - // record receives - randi := rand.Perm(peerCount) - peer1 := peers[randi[0]] - peer2 := peers[randi[1]] - peer3 := peers[randi[2]] - time.Sleep(5 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]}) - time.Sleep(25 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]}) - time.Sleep(5 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]}) - - sessionPeers := sessionPeerManager.GetOptimizedPeers() - if len(sessionPeers) != maxOptimizedPeers { - t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers)) - } +func TestHasPeers(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) - // should prioritize peers which are fastest - // peer1: ~5ms - // peer2: 5 + 25 = ~30ms - // peer3: 5 + 25 + 5 = ~35ms - if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) { - t.Fatal("Did not prioritize peers that received blocks") + if spm.HasPeers() { + t.Fatal("Expected not to have peers yet") } - // should give first peer rating of 1 - if sessionPeers[0].OptimizationRating < 1.0 { - t.Fatal("Did not assign rating to best peer correctly") + spm.AddPeer(peers[0]) + if !spm.HasPeers() { + t.Fatal("Expected to have peers") } - // should give other optimized peers ratings between 0 & 1 - if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) || - (sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) { - t.Fatal("Did not assign rating to other optimized peers correctly") + spm.AddPeer(peers[1]) + if !spm.HasPeers() { + t.Fatal("Expected to have peers") } - // should give other non-optimized peers rating of zero - for i := 3; i < maxOptimizedPeers; i++ { - if sessionPeers[i].OptimizationRating != 0.0 { - t.Fatal("Did not assign rating to unoptimized peer correctly") - } + spm.RemovePeer(peers[0]) + if !spm.HasPeers() { + t.Fatal("Expected to have peers") } - c2 := testutil.GenerateCids(1) - - // Request again - sessionPeerManager.RecordPeerRequests(nil, c2) + spm.RemovePeer(peers[1]) + if spm.HasPeers() { + t.Fatal("Expected to no longer have peers") + } +} - // Receive a second time - sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]}) +func TestHasPeer(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) - // call again - nextSessionPeers := sessionPeerManager.GetOptimizedPeers() - if len(nextSessionPeers) != maxOptimizedPeers { - t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers)) + if spm.HasPeer(peers[0]) { + t.Fatal("Expected not to have peer yet") } - // should sort by average latency - // peer1: ~5ms - // peer3: (~35ms + ~5ms) / 2 = ~20ms - // peer2: ~30ms - if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) || - (nextSessionPeers[2].Peer != peer2) { - t.Fatal("Did not correctly update order of peers sorted by average latency") + spm.AddPeer(peers[0]) + if !spm.HasPeer(peers[0]) { + t.Fatal("Expected to have peer") } - // should randomize other peers - totalSame := 0 - for i := 3; i < maxOptimizedPeers; i++ { - if sessionPeers[i].Peer == nextSessionPeers[i].Peer { - totalSame++ - } - } - if totalSame >= maxOptimizedPeers-3 { - t.Fatal("should not return the same random peers each time") + spm.AddPeer(peers[1]) + if !spm.HasPeer(peers[1]) { + t.Fatal("Expected to have peer") } -} -func TestTimeoutsAndCancels(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - peers := testutil.GeneratePeers(3) - completed := make(chan struct{}) - fpt := &fakePeerTagger{} - fppf := &fakePeerProviderFinder{peers, completed} - c := testutil.GenerateCids(1) - id := testutil.GenerateSessionID() - sessionPeerManager := New(ctx, id, fpt, fppf) - - // add all peers to session - sessionPeerManager.FindMorePeers(ctx, c[0]) - select { - case <-completed: - case <-ctx.Done(): - t.Fatal("Did not finish finding providers") + spm.RemovePeer(peers[0]) + if spm.HasPeer(peers[0]) { + t.Fatal("Expected not to have peer") } - time.Sleep(2 * time.Millisecond) - - sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond) - - // record broadcast - sessionPeerManager.RecordPeerRequests(nil, c) - // record receives - peer1 := peers[0] - peer2 := peers[1] - peer3 := peers[2] - time.Sleep(1 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]}) - time.Sleep(2 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]}) - time.Sleep(40 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]}) + if !spm.HasPeer(peers[1]) { + t.Fatal("Expected to have peer") + } +} - sessionPeers := sessionPeerManager.GetOptimizedPeers() +func TestPeers(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) - // should prioritize peers which are fastest - if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) { - t.Fatal("Did not prioritize peers that received blocks") + if len(spm.Peers()) > 0 { + t.Fatal("Expected not to have peers yet") } - // should give first peer rating of 1 - if sessionPeers[0].OptimizationRating < 1.0 { - t.Fatal("Did not assign rating to best peer correctly") + spm.AddPeer(peers[0]) + if len(spm.Peers()) != 1 { + t.Fatal("Expected to have one peer") } - // should give other optimized peers ratings between 0 & 1 - if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) { - t.Fatal("Did not assign rating to other optimized peers correctly") + spm.AddPeer(peers[1]) + if len(spm.Peers()) != 2 { + t.Fatal("Expected to have two peers") } - // should not record a response for a broadcast return that arrived AFTER the timeout period - // leaving peer unoptimized - if sessionPeers[2].OptimizationRating != 0 { - t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period") + spm.RemovePeer(peers[0]) + if len(spm.Peers()) != 1 { + t.Fatal("Expected to have one peer") } +} - // now we make a targeted request, which SHOULD affect peer - // rating if it times out - c2 := testutil.GenerateCids(1) - - // Request again - sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2) - // wait for a timeout - time.Sleep(40 * time.Millisecond) +func TestPeersDiscovered(t *testing.T) { + peers := testutil.GeneratePeers(2) + spm := New(1, &fakePeerTagger{}) - // call again - nextSessionPeers := sessionPeerManager.GetOptimizedPeers() - if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating { - t.Fatal("Timeout should have affected optimization rating but did not") + if spm.PeersDiscovered() { + t.Fatal("Expected not to have discovered peers yet") } - // now we make a targeted request, but later cancel it - // timing out should not affect rating - c3 := testutil.GenerateCids(1) - - // Request again - sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3) - sessionPeerManager.RecordCancels([]cid.Cid{c3[0]}) - // wait for a timeout - time.Sleep(40 * time.Millisecond) + spm.AddPeer(peers[0]) + if !spm.PeersDiscovered() { + t.Fatal("Expected to have discovered peers") + } - // call again - thirdSessionPeers := sessionPeerManager.GetOptimizedPeers() - if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating { - t.Fatal("Timeout should not have affected optimization rating but did") + spm.RemovePeer(peers[0]) + if !spm.PeersDiscovered() { + t.Fatal("Expected to still have discovered peers") } +} - // if we make a targeted request that is then cancelled, but we still - // receive the block before the timeout, it's worth recording and affecting latency +func TestPeerTagging(t *testing.T) { + peers := testutil.GeneratePeers(2) + fpt := &fakePeerTagger{} + spm := New(1, fpt) - c4 := testutil.GenerateCids(1) + spm.AddPeer(peers[0]) + if len(fpt.taggedPeers) != 1 { + t.Fatal("Expected to have tagged one peer") + } - // Request again - sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4) - sessionPeerManager.RecordCancels([]cid.Cid{c4[0]}) - time.Sleep(2 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]}) - time.Sleep(2 * time.Millisecond) + spm.AddPeer(peers[0]) + if len(fpt.taggedPeers) != 1 { + t.Fatal("Expected to have tagged one peer") + } - // call again - fourthSessionPeers := sessionPeerManager.GetOptimizedPeers() - if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating { - t.Fatal("Timeout should have affected optimization rating but did not") + spm.AddPeer(peers[1]) + if len(fpt.taggedPeers) != 2 { + t.Fatal("Expected to have tagged two peers") } - // ensure all peer latency tracking has been cleaned up - if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 { - t.Fatal("Latency request tracking should have been cleaned up but was not") + spm.RemovePeer(peers[1]) + if len(fpt.taggedPeers) != 1 { + t.Fatal("Expected to have untagged peer") } } -func TestUntaggingPeers(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond) - defer cancel() - peers := testutil.GeneratePeers(5) - completed := make(chan struct{}) +func TestShutdown(t *testing.T) { + peers := testutil.GeneratePeers(2) fpt := &fakePeerTagger{} - fppf := &fakePeerProviderFinder{peers, completed} - c := testutil.GenerateCids(1)[0] - id := testutil.GenerateSessionID() - - sessionPeerManager := New(ctx, id, fpt, fppf) + spm := New(1, fpt) - sessionPeerManager.FindMorePeers(ctx, c) - select { - case <-completed: - case <-ctx.Done(): - t.Fatal("Did not finish finding providers") + spm.AddPeer(peers[0]) + spm.AddPeer(peers[1]) + if len(fpt.taggedPeers) != 2 { + t.Fatal("Expected to have tagged two peers") } - time.Sleep(15 * time.Millisecond) - if fpt.count() != len(peers) { - t.Fatal("Peers were not tagged!") - } - <-ctx.Done() - fpt.wait.Wait() + spm.Shutdown() - if fpt.count() != 0 { - t.Fatal("Peers were not untagged!") + if len(fpt.taggedPeers) != 0 { + t.Fatal("Expected to have untagged all peers") } }