Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Test: fix flakey session peer manager tests #185

Merged
merged 6 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,27 +277,38 @@ type getPeersMessage struct {
resp chan<- []bssd.OptimizedPeer
}

// Get all optimized peers in order followed by randomly ordered unoptimized
// peers, with a limit of maxOptimizedPeers
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))

// Number of peers to get in total: unoptimized + optimized
// limited by maxOptimizedPeers
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}

// The best peer latency is the first optimized peer's latency.
// If we haven't recorded any peer's latency, use 0.
var bestPeerLatency float64
if len(spm.optimizedPeersArr) > 0 {
bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
} else {
bestPeerLatency = 0
}

optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
for i := 0; i < maxPeers; i++ {
// First add optimized peers in order
if i < len(spm.optimizedPeersArr) {
p := spm.optimizedPeersArr[i]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
Peer: p,
OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
})
} else {
// Then add unoptimized peers in random order
p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
}
Expand Down
39 changes: 24 additions & 15 deletions sessionpeermanager/sessionpeermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sessionpeermanager

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
Expand Down Expand Up @@ -148,9 +149,10 @@ func TestRecordingReceivedBlocks(t *testing.T) {

func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
ctx, cancel := context.WithTimeout(ctx, 60*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(100)
peerCount := 100
peers := testutil.GeneratePeers(peerCount)
completed := make(chan struct{})
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
Expand All @@ -165,28 +167,32 @@ func TestOrderingPeers(t *testing.T) {
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
time.Sleep(2 * time.Millisecond)
time.Sleep(5 * time.Millisecond)

// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)

// record receives
peer1 := peers[rand.Intn(100)]
peer2 := peers[rand.Intn(100)]
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
randi := rand.Perm(peerCount)
peer1 := peers[randi[0]]
peer2 := peers[randi[1]]
peer3 := peers[randi[2]]
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
time.Sleep(25 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
time.Sleep(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})

sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers))
}

// should prioritize peers which are fastest
// peer1: ~5ms
// peer2: 5 + 25 = ~30ms
// peer3: 5 + 25 + 5 = ~35ms
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
Expand All @@ -202,7 +208,7 @@ func TestOrderingPeers(t *testing.T) {
t.Fatal("Did not assign rating to other optimized peers correctly")
}

// should other peers rating of zero
// should give other non-optimized peers rating of zero
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i].OptimizationRating != 0.0 {
t.Fatal("Did not assign rating to unoptimized peer correctly")
Expand All @@ -220,13 +226,16 @@ func TestOrderingPeers(t *testing.T) {
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(nextSessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers))
}

// should sort by average latency
// peer1: ~5ms
// peer3: (~35ms + ~5ms) / 2 = ~20ms
// peer2: ~30ms
if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
(nextSessionPeers[2].Peer != peer2) {
t.Fatal("Did not dedup peers which received multiple blocks")
t.Fatal("Did not correctly update order of peers sorted by average latency")
}

// should randomize other peers
Expand Down Expand Up @@ -358,7 +367,7 @@ func TestTimeoutsAndCancels(t *testing.T) {

func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(5)
completed := make(chan struct{})
Expand All @@ -375,7 +384,7 @@ func TestUntaggingPeers(t *testing.T) {
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
time.Sleep(2 * time.Millisecond)
time.Sleep(15 * time.Millisecond)

if fpt.count() != len(peers) {
t.Fatal("Peers were not tagged!")
Expand Down