diff --git a/benchmarks_test.go b/benchmarks_test.go index 71e04629..9761a26c 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -130,7 +130,7 @@ var mixedBenches = []mixedBench{ mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2}, mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2}, mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2}, - mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2}, + // mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2}, } func BenchmarkFetchFromOldBitswap(b *testing.B) { diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 0db51f88..892c3057 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -1007,9 +1007,9 @@ func TestTaggingPeers(t *testing.T) { } func TestTaggingUseful(t *testing.T) { - peerSampleInterval := 2 * time.Millisecond + peerSampleInterval := 5 * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() me := newTestEngine(ctx, "engine", peerSampleInterval) friend := peer.ID("friend") @@ -1023,11 +1023,11 @@ func TestTaggingUseful(t *testing.T) { t.Fatal("Peers should be untagged but weren't") } me.Engine.MessageSent(friend, msg) - time.Sleep(peerSampleInterval * 2) + time.Sleep(8 * time.Millisecond) if me.PeerTagger.count(me.Engine.tagUseful) != 1 { t.Fatal("Peers should be tagged but weren't") } - time.Sleep(peerSampleInterval * 8) + time.Sleep(peerSampleInterval * 10) } if me.PeerTagger.count(me.Engine.tagUseful) == 0 { diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index be074000..8e251889 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -46,6 +46,7 @@ type MessageNetwork interface { NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result + Self() peer.ID } // MessageQueue implements queue of want messages to send to peers. diff --git a/internal/session/session.go b/internal/session/session.go index b9231928..45cd825f 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -4,9 +4,9 @@ import ( "context" "time" - // lu "github.com/ipfs/go-bitswap/internal/logutil" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bsgetter "github.com/ipfs/go-bitswap/internal/getter" + lu "github.com/ipfs/go-bitswap/internal/logutil" notifications "github.com/ipfs/go-bitswap/internal/notifications" bspm "github.com/ipfs/go-bitswap/internal/peermanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" @@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // Search for providers who have the first want in the list. // Typically if the provider has the first block they will have // the rest of the blocks also. - log.Warnf("Ses%d: FindMorePeers with want 0 of %d wants", s.id, len(wants)) + log.Warnf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants)) s.findMorePeers(ctx, wants[0]) } s.resetIdleTick() diff --git a/internal/session/sessionwants.go b/internal/session/sessionwants.go index ad8dcd1b..60df0df2 100644 --- a/internal/session/sessionwants.go +++ b/internal/session/sessionwants.go @@ -56,7 +56,7 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid { func (sw *sessionWants) WantsSent(ks []cid.Cid) { now := time.Now() for _, c := range ks { - if _, ok := sw.liveWants[c]; !ok { + if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) { sw.toFetch.Remove(c) sw.liveWants[c] = now } @@ -83,8 +83,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) totalLatency += now.Sub(sentAt) } - // Remove the CID from the live wants / toFetch queue and add it - // to the past wants + // Remove the CID from the live wants / toFetch queue delete(sw.liveWants, c) sw.toFetch.Remove(c) } @@ -96,6 +95,9 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // PrepareBroadcast saves the current time for each live want and returns the // live want CIDs. func (sw *sessionWants) PrepareBroadcast() []cid.Cid { + // TODO: Change this to return wants in order so that the session will + // send out Find Providers request for the first want + // (Note that maps return keys in random order) now := time.Now() live := make([]cid.Cid, 0, len(sw.liveWants)) for c := range sw.liveWants { diff --git a/internal/session/sessionwantsender.go b/internal/session/sessionwantsender.go index cffb39bb..df963f9e 100644 --- a/internal/session/sessionwantsender.go +++ b/internal/session/sessionwantsender.go @@ -4,6 +4,7 @@ import ( "context" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" + lu "github.com/ipfs/go-bitswap/internal/logutil" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -298,16 +299,41 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) { // processUpdates processes incoming blocks and HAVE / DONT_HAVEs. // It returns all DONT_HAVEs. func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { - prunePeers := make(map[peer.ID]struct{}) - dontHaves := cid.NewSet() + // Process received blocks keys + blkCids := cid.NewSet() for _, upd := range updates { - // TODO: If there is a timeout for the want from the peer, remove want.sentTo - // so the want can be sent to another peer (and blacklist the peer?) - // TODO: If a peer is no longer available, check if all providers of - // each CID have been exhausted + for _, c := range upd.ks { + blkCids.Add(c) + log.Warnf("received block %s", lu.C(c)) + // Remove the want + removed := sws.removeWant(c) + if removed != nil { + // Inform the peer tracker that this peer was the first to send + // us the block + sws.peerRspTrkr.receivedBlockFrom(upd.from) + } + delete(sws.peerConsecutiveDontHaves, upd.from) + } + } - // For each DONT_HAVE + // Process received DONT_HAVEs + dontHaves := cid.NewSet() + prunePeers := make(map[peer.ID]struct{}) + for _, upd := range updates { for _, c := range upd.dontHaves { + // Track the number of consecutive DONT_HAVEs each peer receives + if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit { + prunePeers[upd.from] = struct{}{} + } else { + sws.peerConsecutiveDontHaves[upd.from]++ + } + + // If we already received a block for the want, there's no need to + // update block presence etc + if blkCids.Has(c) { + continue + } + dontHaves.Add(c) // Update the block presence for the peer @@ -322,40 +348,41 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { sws.setWantSentTo(c, "") } } - - // Track the number of consecutive DONT_HAVEs each peer receives - if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit { - prunePeers[upd.from] = struct{}{} - } else { - sws.peerConsecutiveDontHaves[upd.from]++ - } } + } - // For each HAVE + // Process received HAVEs + for _, upd := range updates { for _, c := range upd.haves { - // Update the block presence for the peer - sws.updateWantBlockPresence(c, upd.from) - delete(sws.peerConsecutiveDontHaves, upd.from) - } - - // For each received block - for _, c := range upd.ks { - // Remove the want - removed := sws.removeWant(c) - if removed != nil { - // Inform the peer tracker that this peer was the first to send - // us the block - sws.peerRspTrkr.receivedBlockFrom(upd.from) + // If we haven't already received a block for the want + if !blkCids.Has(c) { + // Update the block presence for the peer + sws.updateWantBlockPresence(c, upd.from) } + + // Clear the consecutive DONT_HAVE count for the peer delete(sws.peerConsecutiveDontHaves, upd.from) + delete(prunePeers, upd.from) } } // If any peers have sent us too many consecutive DONT_HAVEs, remove them // from the session + for p := range prunePeers { + // Before removing the peer from the session, check if the peer + // sent us a HAVE for a block that we want + for c := range sws.wants { + if sws.bpm.PeerHasBlock(p, c) { + delete(prunePeers, p) + break + } + } + } if len(prunePeers) > 0 { go func() { for p := range prunePeers { + // Peer doesn't have anything we want, so remove it + log.Infof("peer %s sent too many dont haves", lu.P(p)) sws.SignalAvailability(p, false) } }() diff --git a/internal/session/sessionwantsender_test.go b/internal/session/sessionwantsender_test.go index c6a3f72c..1a35c0ea 100644 --- a/internal/session/sessionwantsender_test.go +++ b/internal/session/sessionwantsender_test.go @@ -529,9 +529,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) { // Add all cids as wants spm.Add(cids) - // Receive a HAVE from peer (adds it to the session) - bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) + // Receive a block from peer (adds it to the session) + spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{}) // Wait for processing to complete time.Sleep(10 * time.Millisecond) @@ -586,9 +585,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { // Add all cids as wants spm.Add(cids) - // Receive a HAVE from peer (adds it to the session) - bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) + // Receive a block from peer (adds it to the session) + spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{}) // Wait for processing to complete time.Sleep(5 * time.Millisecond) @@ -642,9 +640,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { // Add all cids as wants spm.Add(cids) - // Receive a HAVE from peer (adds it to the session) - bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) - spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) + // Receive a block from peer (adds it to the session) + spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{}) // Wait for processing to complete time.Sleep(5 * time.Millisecond) @@ -661,7 +658,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { } // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // Session should remove peer if has := fpm.HasPeer(p); has { @@ -673,7 +670,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // Peer should be available if has := fpm.HasPeer(p); !has { @@ -689,7 +686,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { } // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // Peer should be available if has := fpm.HasPeer(p); !has { @@ -703,10 +700,54 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { } // Wait for processing to complete - time.Sleep(5 * time.Millisecond) + time.Sleep(10 * time.Millisecond) // Session should remove peer if has := fpm.HasPeer(p); has { t.Fatal("Expected peer not to be available") } } + +func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) { + cids := testutil.GenerateCids(peerDontHaveLimit + 10) + p := testutil.GeneratePeers(1)[0] + sid := uint64(1) + pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() + bpm := bsbpm.New() + onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} + onPeersExhausted := func([]cid.Cid) {} + spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted) + + go spm.Run() + + // Add all cids as wants + spm.Add(cids) + + // Receive a HAVE from peer (adds it to the session) + bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) + spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) + + // Wait for processing to complete + time.Sleep(10 * time.Millisecond) + + // Peer should be available + if has := fpm.HasPeer(p); !has { + t.Fatal("Expected peer to be available") + } + + // Receive DONT_HAVEs from peer that exceed limit + for _, c := range cids[1 : peerDontHaveLimit+5] { + bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c}) + spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}) + } + + // Wait for processing to complete + time.Sleep(20 * time.Millisecond) + + // Peer should still be available because it has a block that we want. + // (We received a HAVE for cid 0 but didn't yet receive the block) + if has := fpm.HasPeer(p); !has { + t.Fatal("Expected peer to be available") + } +} diff --git a/internal/sessionpeermanager/sessionpeermanager.go b/internal/sessionpeermanager/sessionpeermanager.go index cc6e7110..90233c72 100644 --- a/internal/sessionpeermanager/sessionpeermanager.go +++ b/internal/sessionpeermanager/sessionpeermanager.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + lu "github.com/ipfs/go-bitswap/internal/logutil" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-core/peer" @@ -61,7 +62,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool { // connection spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue) - log.Infof("Added peer %s to session: %d peers\n", p, len(spm.peers)) + log.Debugf("Added peer %s to session (%d peers)\n", p, len(spm.peers)) return true } @@ -77,6 +78,8 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool { delete(spm.peers, p) spm.tagger.UntagPeer(p, spm.tag) + + log.Debugf("Removed peer %s from session (%d peers)", lu.P(p), len(spm.peers)) return true }