Skip to content

Commit

Permalink
Record valuable peers will only take into account latency of seed pee…
Browse files Browse the repository at this point in the history
…rs (#536)

* record valuable peers algorithm only takes into account latency of seed peers
  • Loading branch information
aschmahmann authored Apr 3, 2020
1 parent b0b69cc commit 1506cc0
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -43,6 +44,9 @@ type query struct {
// seedPeers is the set of peers that seed the query
seedPeers []peer.ID

// peerTimes contains the duration of each successful query to a peer
peerTimes map[peer.ID]time.Duration

// queryPeers is the set of peers known by this query and their respective states.
queryPeers *qpeerset.QueryPeerset

Expand Down Expand Up @@ -155,6 +159,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
dht: dht,
queryPeers: qpeerset.NewQueryPeerset(target),
seedPeers: seedPeers,
peerTimes: make(map[peer.ID]time.Duration),
terminated: false,
queryFn: queryFn,
stopFn: stopFn,
Expand All @@ -176,15 +181,20 @@ func (q *query) recordPeerIsValuable(p peer.ID) {
}

func (q *query) recordValuablePeers() {
closePeers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta)
for _, p := range closePeers {
referrer := p
for {
q.recordPeerIsValuable(referrer)
referrer = q.queryPeers.GetReferrer(referrer)
if referrer == q.dht.self {
break
}
// Valuable peers algorithm:
// Label the seed peer that responded to a query in the shortest amount of time as the "most valuable peer" (MVP)
// Each seed peer that responded to a query within some range (i.e. 2x) of the MVP's time is a valuable peer
// Mark the MVP and all the other valuable peers as valuable
mvpDuration := time.Duration(math.MaxInt64)
for _, p := range q.seedPeers {
if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration {
mvpDuration = queryTime
}
}

for _, p := range q.seedPeers {
if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration*2 {
q.recordPeerIsValuable(p)
}
}
}
Expand Down Expand Up @@ -232,10 +242,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
}

type queryUpdate struct {
cause peer.ID
heard []peer.ID
queried []peer.ID
unreachable []peer.ID
cause peer.ID
heard []peer.ID
queried []peer.ID
unreachable []peer.ID
queryDuration time.Duration
}

func (q *query) run() {
Expand Down Expand Up @@ -368,6 +379,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
defer q.waitGroup.Done()
dialCtx, queryCtx := ctx, ctx

startQuery := time.Now()
// dial the peer
if err := q.dht.dialPeer(dialCtx, p); err != nil {
// remove the peer if there was a dial failure..but not because of a context cancellation
Expand All @@ -388,6 +400,8 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
return
}

queryDuration := time.Since(startQuery)

// query successful, try to add to RT
q.dht.peerFound(q.dht.ctx, p, true)

Expand All @@ -410,7 +424,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
}
}

ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}}
ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}, queryDuration: queryDuration}
}

func (q *query) updateState(ctx context.Context, up *queryUpdate) {
Expand Down Expand Up @@ -446,6 +460,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {
}
if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting {
q.queryPeers.SetState(p, qpeerset.PeerQueried)
q.peerTimes[p] = up.queryDuration
} else {
panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st))
}
Expand Down

0 comments on commit 1506cc0

Please sign in to comment.