From 04771f103eb5d7f8009d59655fa9d129cb144715 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 2 Apr 2020 21:38:40 -0400 Subject: [PATCH 1/2] record valuable peers algorithm only takes into account latency of seed peers --- query.go | 67 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/query.go b/query.go index d51d057cf..b95b11feb 100644 --- a/query.go +++ b/query.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "sync" "time" @@ -43,6 +44,9 @@ type query struct { // seedPeers is the set of peers that seed the query seedPeers []peer.ID + // seedPeerTimes contains the duration of each successful query to a seed peer + seedPeerTimes map[peer.ID]time.Duration + // queryPeers is the set of peers known by this query and their respective states. queryPeers *qpeerset.QueryPeerset @@ -149,15 +153,16 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn } q := &query{ - id: uuid.New(), - key: target, - ctx: ctx, - dht: dht, - queryPeers: qpeerset.NewQueryPeerset(target), - seedPeers: seedPeers, - terminated: false, - queryFn: queryFn, - stopFn: stopFn, + id: uuid.New(), + key: target, + ctx: ctx, + dht: dht, + queryPeers: qpeerset.NewQueryPeerset(target), + seedPeers: seedPeers, + seedPeerTimes: make(map[peer.ID]time.Duration), + terminated: false, + queryFn: queryFn, + stopFn: stopFn, } // run the query @@ -176,17 +181,23 @@ func (q *query) recordPeerIsValuable(p peer.ID) { } func (q *query) recordValuablePeers() { - closePeers := q.queryPeers.GetClosestNotUnreachable(q.dht.beta) - for _, p := range closePeers { - referrer := p - for { - q.recordPeerIsValuable(referrer) - referrer = q.queryPeers.GetReferrer(referrer) - if referrer == q.dht.self { - break - } + // Valuable peers algorithm: + // Label the seed peer that responded to a query in the shortest amount of time as the "most valuable peer" (MVP) + // Each seed peer that responded to a query within some range (i.e. 2x) of the MVP's time is a valuable peer + // Mark the MVP and all the other valuable peers as valuable + mvpDuration := time.Duration(math.MaxInt64) + for _, dur := range q.seedPeerTimes { + if dur < mvpDuration { + mvpDuration = dur + } + } + + for p, dur := range q.seedPeerTimes { + if dur < mvpDuration*2 { + q.recordPeerIsValuable(p) } } + } // constructLookupResult takes the query information and uses it to construct the lookup result @@ -232,10 +243,11 @@ func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { } type queryUpdate struct { - cause peer.ID - heard []peer.ID - queried []peer.ID - unreachable []peer.ID + cause peer.ID + heard []peer.ID + queried []peer.ID + unreachable []peer.ID + queryDuration time.Duration } func (q *query) run() { @@ -368,6 +380,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID defer q.waitGroup.Done() dialCtx, queryCtx := ctx, ctx + startQuery := time.Now() // dial the peer if err := q.dht.dialPeer(dialCtx, p); err != nil { // remove the peer if there was a dial failure..but not because of a context cancellation @@ -388,6 +401,8 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID return } + queryDuration := time.Since(startQuery) + // query successful, try to add to RT q.dht.peerFound(q.dht.ctx, p, true) @@ -410,7 +425,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID } } - ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}} + ch <- &queryUpdate{cause: p, heard: saw, queried: []peer.ID{p}, queryDuration: queryDuration} } func (q *query) updateState(ctx context.Context, up *queryUpdate) { @@ -446,6 +461,12 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { } if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerQueried) + for _, seed := range q.seedPeers { + if p == seed { + q.seedPeerTimes[p] = up.queryDuration + } + } + } else { panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st)) } From d45f88d6ae51086aff9c97790c5859b9861e218f Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 2 Apr 2020 23:04:31 -0400 Subject: [PATCH 2/2] track all peer query times --- query.go | 42 ++++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/query.go b/query.go index b95b11feb..00eddd95c 100644 --- a/query.go +++ b/query.go @@ -44,8 +44,8 @@ type query struct { // seedPeers is the set of peers that seed the query seedPeers []peer.ID - // seedPeerTimes contains the duration of each successful query to a seed peer - seedPeerTimes map[peer.ID]time.Duration + // peerTimes contains the duration of each successful query to a peer + peerTimes map[peer.ID]time.Duration // queryPeers is the set of peers known by this query and their respective states. queryPeers *qpeerset.QueryPeerset @@ -153,16 +153,16 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn } q := &query{ - id: uuid.New(), - key: target, - ctx: ctx, - dht: dht, - queryPeers: qpeerset.NewQueryPeerset(target), - seedPeers: seedPeers, - seedPeerTimes: make(map[peer.ID]time.Duration), - terminated: false, - queryFn: queryFn, - stopFn: stopFn, + id: uuid.New(), + key: target, + ctx: ctx, + dht: dht, + queryPeers: qpeerset.NewQueryPeerset(target), + seedPeers: seedPeers, + peerTimes: make(map[peer.ID]time.Duration), + terminated: false, + queryFn: queryFn, + stopFn: stopFn, } // run the query @@ -186,18 +186,17 @@ func (q *query) recordValuablePeers() { // Each seed peer that responded to a query within some range (i.e. 2x) of the MVP's time is a valuable peer // Mark the MVP and all the other valuable peers as valuable mvpDuration := time.Duration(math.MaxInt64) - for _, dur := range q.seedPeerTimes { - if dur < mvpDuration { - mvpDuration = dur + for _, p := range q.seedPeers { + if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration { + mvpDuration = queryTime } } - for p, dur := range q.seedPeerTimes { - if dur < mvpDuration*2 { + for _, p := range q.seedPeers { + if queryTime, ok := q.peerTimes[p]; ok && queryTime < mvpDuration*2 { q.recordPeerIsValuable(p) } } - } // constructLookupResult takes the query information and uses it to construct the lookup result @@ -461,12 +460,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { } if st := q.queryPeers.GetState(p); st == qpeerset.PeerWaiting { q.queryPeers.SetState(p, qpeerset.PeerQueried) - for _, seed := range q.seedPeers { - if p == seed { - q.seedPeerTimes[p] = up.queryDuration - } - } - + q.peerTimes[p] = up.queryDuration } else { panic(fmt.Errorf("kademlia protocol error: tried to transition to the queried state from state %v", st)) }