diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index 93723c9e..3c4e1374 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -277,20 +277,30 @@ type getPeersMessage struct { resp chan<- []bssd.OptimizedPeer } +// Get all optimized peers in order followed by randomly ordered unoptimized +// peers, with a limit of maxOptimizedPeers func (prm *getPeersMessage) handle(spm *SessionPeerManager) { randomOrder := rand.Perm(len(spm.unoptimizedPeersArr)) + + // Number of peers to get in total: unoptimized + optimized + // limited by maxOptimizedPeers maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr) if maxPeers > maxOptimizedPeers { maxPeers = maxOptimizedPeers } + + // The best peer latency is the first optimized peer's latency. + // If we haven't recorded any peer's latency, use 0. var bestPeerLatency float64 if len(spm.optimizedPeersArr) > 0 { bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency) } else { bestPeerLatency = 0 } + optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers) for i := 0; i < maxPeers; i++ { + // First add optimized peers in order if i < len(spm.optimizedPeersArr) { p := spm.optimizedPeersArr[i] optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{ @@ -298,6 +308,7 @@ func (prm *getPeersMessage) handle(spm *SessionPeerManager) { OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency), }) } else { + // Then add unoptimized peers in random order p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]] optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0}) } diff --git a/sessionpeermanager/sessionpeermanager_test.go b/sessionpeermanager/sessionpeermanager_test.go index e6808307..87262b69 100644 --- a/sessionpeermanager/sessionpeermanager_test.go +++ b/sessionpeermanager/sessionpeermanager_test.go @@ -2,6 +2,7 @@ package sessionpeermanager import ( "context" + "fmt" "math/rand" "sync" "testing" @@ -148,9 +149,10 @@ func TestRecordingReceivedBlocks(t *testing.T) { func TestOrderingPeers(t *testing.T) { ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, 60*time.Millisecond) defer cancel() - peers := testutil.GeneratePeers(100) + peerCount := 100 + peers := testutil.GeneratePeers(peerCount) completed := make(chan struct{}) fpt := &fakePeerTagger{} fppf := &fakePeerProviderFinder{peers, completed} @@ -165,28 +167,32 @@ func TestOrderingPeers(t *testing.T) { case <-ctx.Done(): t.Fatal("Did not finish finding providers") } - time.Sleep(2 * time.Millisecond) + time.Sleep(5 * time.Millisecond) // record broadcast sessionPeerManager.RecordPeerRequests(nil, c) // record receives - peer1 := peers[rand.Intn(100)] - peer2 := peers[rand.Intn(100)] - peer3 := peers[rand.Intn(100)] - time.Sleep(1 * time.Millisecond) - sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]}) + randi := rand.Perm(peerCount) + peer1 := peers[randi[0]] + peer2 := peers[randi[1]] + peer3 := peers[randi[2]] time.Sleep(5 * time.Millisecond) + sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]}) + time.Sleep(25 * time.Millisecond) sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]}) - time.Sleep(1 * time.Millisecond) + time.Sleep(5 * time.Millisecond) sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]}) sessionPeers := sessionPeerManager.GetOptimizedPeers() if len(sessionPeers) != maxOptimizedPeers { - t.Fatal("Should not return more than the max of optimized peers") + t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers)) } // should prioritize peers which are fastest + // peer1: ~5ms + // peer2: 5 + 25 = ~30ms + // peer3: 5 + 25 + 5 = ~35ms if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) { t.Fatal("Did not prioritize peers that received blocks") } @@ -202,7 +208,7 @@ func TestOrderingPeers(t *testing.T) { t.Fatal("Did not assign rating to other optimized peers correctly") } - // should other peers rating of zero + // should give other non-optimized peers rating of zero for i := 3; i < maxOptimizedPeers; i++ { if sessionPeers[i].OptimizationRating != 0.0 { t.Fatal("Did not assign rating to unoptimized peer correctly") @@ -220,13 +226,16 @@ func TestOrderingPeers(t *testing.T) { // call again nextSessionPeers := sessionPeerManager.GetOptimizedPeers() if len(nextSessionPeers) != maxOptimizedPeers { - t.Fatal("Should not return more than the max of optimized peers") + t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers)) } // should sort by average latency + // peer1: ~5ms + // peer3: (~35ms + ~5ms) / 2 = ~20ms + // peer2: ~30ms if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) || (nextSessionPeers[2].Peer != peer2) { - t.Fatal("Did not dedup peers which received multiple blocks") + t.Fatal("Did not correctly update order of peers sorted by average latency") } // should randomize other peers @@ -358,7 +367,7 @@ func TestTimeoutsAndCancels(t *testing.T) { func TestUntaggingPeers(t *testing.T) { ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond) defer cancel() peers := testutil.GeneratePeers(5) completed := make(chan struct{}) @@ -375,7 +384,7 @@ func TestUntaggingPeers(t *testing.T) { case <-ctx.Done(): t.Fatal("Did not finish finding providers") } - time.Sleep(2 * time.Millisecond) + time.Sleep(15 * time.Millisecond) if fpt.count() != len(peers) { t.Fatal("Peers were not tagged!")