diff --git a/kpeerset/kpeerset.go b/kpeerset/kpeerset.go deleted file mode 100644 index 2e19d7e2a..000000000 --- a/kpeerset/kpeerset.go +++ /dev/null @@ -1,70 +0,0 @@ -package kpeerset - -import ( - "github.com/libp2p/go-libp2p-core/peer" - "math/big" -) - -type IPeerMetric interface { - Peer() peer.ID - Metric() *big.Int -} - -// peerMetric tracks a peer and its distance to something else. -type peerMetric struct { - // the peer - peer peer.ID - - // big.Int for XOR metric - metric *big.Int -} - -func (pm peerMetric) Peer() peer.ID { return pm.peer } -func (pm peerMetric) Metric() *big.Int { return pm.metric } - -type peerMetricHeapItem struct { - IPeerMetric - - // The index of the item in the heap - index int -} - -// peerMetricHeap implements a heap of peerDistances. -// The heap sorts by furthest if direction = 1 and closest if direction = -1 -type peerMetricHeap struct { - data []*peerMetricHeapItem - direction int -} - -func (ph *peerMetricHeap) Len() int { - return len(ph.data) -} - -func (ph *peerMetricHeap) Less(i, j int) bool { - h := ph.data - return ph.direction == h[i].Metric().Cmp(h[j].Metric()) -} - -func (ph *peerMetricHeap) Swap(i, j int) { - h := ph.data - h[i], h[j] = h[j], h[i] - h[i].index = i - h[j].index = j -} - -func (ph *peerMetricHeap) Push(x interface{}) { - n := len(ph.data) - item := x.(*peerMetricHeapItem) - item.index = n - ph.data = append(ph.data, item) -} - -func (ph *peerMetricHeap) Pop() interface{} { - old := ph.data - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety - ph.data = old[0 : n-1] - return item -} diff --git a/kpeerset/metrics.go b/kpeerset/metrics.go deleted file mode 100644 index 68e2721b1..000000000 --- a/kpeerset/metrics.go +++ /dev/null @@ -1,73 +0,0 @@ -package kpeerset - -import ( - "math/big" - "sort" - "time" - - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/peerstore" -) - -type peerLatencyMetric struct { - peerMetric - connectedness network.Connectedness - latency time.Duration -} - -type peerLatencyMetricList []peerLatencyMetric - -func (p peerLatencyMetricList) Len() int { return len(p) } -func (p peerLatencyMetricList) Less(i, j int) bool { - pm1, pm2 := p[i], p[j] - return calculationLess(pm1, pm2) -} -func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer } - -func calculationLess(pm1, pm2 peerLatencyMetric) bool { - return calc(pm1).Cmp(calc(pm2)) == -1 -} - -func calc(pm peerLatencyMetric) *big.Int { - var c int64 - switch pm.connectedness { - case network.Connected: - c = 1 - case network.CanConnect: - c = 5 - case network.CannotConnect: - c = 10000 - default: - c = 20 - } - - l := int64(pm.latency) - if l <= 0 { - l = int64(time.Second) * 10 - } - - res := big.NewInt(c) - tmp := big.NewInt(l) - res.Mul(res, tmp) - res.Mul(res, pm.metric) - - return res -} - -var _ SortablePeers = (*peerLatencyMetricList)(nil) - -func PeersSortedByLatency(peers []IPeerMetric, net network.Network, metrics peerstore.Metrics) SortablePeers { - lst := make(peerLatencyMetricList, len(peers)) - for i := range lst { - p := peers[i].Peer() - lst[i] = peerLatencyMetric{ - peerMetric: peerMetric{peer: p, metric: peers[i].Metric()}, - connectedness: net.Connectedness(p), - latency: metrics.LatencyEWMA(p), - } - } - sort.Sort(lst) - return lst -} diff --git a/kpeerset/peerheap/heap.go b/kpeerset/peerheap/heap.go new file mode 100644 index 000000000..e42abb405 --- /dev/null +++ b/kpeerset/peerheap/heap.go @@ -0,0 +1,110 @@ +package peerheap + +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +// Comparator is the type of a function that compares two peer Heap items to determine the ordering between them. +// It returns true if i1 is "less" than i2 and false otherwise. +type Comparator func(i1 Item, i2 Item) bool + +// Item is one "item" in the Heap. +// It contains the Id of the peer, an arbitrary value associated with the peer +// and the index of the "item" in the Heap. +type Item struct { + Peer peer.ID + Value interface{} + Index int +} + +// Heap implements a heap of peer Items. +// It uses the "compare" member function to compare two peers to determine the order between them. +// If isMaxHeap is set to true, this Heap is a maxHeap, otherwise it's a minHeap. +// +// Note: It is the responsibility of the caller to enforce locking & synchronization. +type Heap struct { + items []*Item + compare Comparator + isMaxHeap bool +} + +// New creates & returns a peer Heap. +func New(isMaxHeap bool, compare Comparator) *Heap { + return &Heap{isMaxHeap: isMaxHeap, compare: compare} +} + +// PeekTop returns a copy of the top/first Item in the heap. +// This would be the "maximum" or the "minimum" peer depending on whether +// the heap is a maxHeap or a minHeap. +// +// A call to PeekTop will panic if the Heap is empty. +func (ph *Heap) PeekTop() Item { + return *ph.items[0] +} + +// FilterItems returns Copies of ALL Items in the Heap that satisfy the given predicate +func (ph *Heap) FilterItems(p func(i Item) bool) []Item { + var items []Item + + for _, i := range ph.items { + ih := *i + if p(ih) { + items = append(items, ih) + } + } + return items +} + +// Peers returns all the peers currently in the heap +func (ph *Heap) Peers() []peer.ID { + peers := make([]peer.ID, 0, len(ph.items)) + + for _, i := range ph.items { + peers = append(peers, i.Peer) + } + return peers +} + +// Note: The functions below make the Heap satisfy the "heap.Interface" as required by the "heap" package in the +// standard library. Please refer to the docs for "heap.Interface" in the standard library for more details. + +func (ph *Heap) Len() int { + return len(ph.items) +} + +func (ph *Heap) Less(i, j int) bool { + h := ph.items + + isLess := ph.compare(*h[i], *h[j]) + + // because the "compare" function returns true if item1 is less than item2, + // we need to reverse it's result if the Heap is a maxHeap. + if ph.isMaxHeap { + return !isLess + } + return isLess +} + +func (ph *Heap) Swap(i, j int) { + h := ph.items + h[i], h[j] = h[j], h[i] + h[i].Index = i + h[j].Index = j +} + +func (ph *Heap) Push(x interface{}) { + n := len(ph.items) + item := x.(*Item) + item.Index = n + ph.items = append(ph.items, item) +} + +func (ph *Heap) Pop() interface{} { + old := ph.items + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.Index = -1 // for safety + ph.items = old[0 : n-1] + return item +} diff --git a/kpeerset/peerheap/heap_test.go b/kpeerset/peerheap/heap_test.go new file mode 100644 index 000000000..70a622eed --- /dev/null +++ b/kpeerset/peerheap/heap_test.go @@ -0,0 +1,91 @@ +package peerheap + +import ( + "container/heap" + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/stretchr/testify/require" +) + +// a comparator that compares peer Ids based on their length +var cmp = func(i1 Item, i2 Item) bool { + return len(i1.Peer) < len(i2.Peer) +} + +var ( + peer1 = peer.ID("22") + peer2 = peer.ID("1") + peer3 = peer.ID("333") +) + +func TestMinHeap(t *testing.T) { + // create new + ph := New(false, cmp) + require.Zero(t, ph.Len()) + + // push the element + heap.Push(ph, &Item{Peer: peer1}) + // assertions + require.True(t, ph.Len() == 1) + require.Equal(t, peer1, ph.PeekTop().Peer) + + // push another element + heap.Push(ph, &Item{Peer: peer2}) + // assertions + require.True(t, ph.Len() == 2) + require.Equal(t, peer2, ph.PeekTop().Peer) + + // push another element + heap.Push(ph, &Item{Peer: peer3}) + // assertions + require.True(t, ph.Len() == 3) + require.Equal(t, peer2, ph.PeekTop().Peer) + + // remove & add again + heap.Remove(ph, 1) + require.True(t, ph.Len() == 2) + heap.Remove(ph, 0) + require.True(t, ph.Len() == 1) + + heap.Push(ph, &Item{Peer: peer1}) + heap.Push(ph, &Item{Peer: peer2}) + + // test filter peers + filtered := ph.FilterItems(func(i Item) bool { + return len(i.Peer) != 2 + }) + require.Len(t, filtered, 2) + require.Contains(t, itemsToPeers(filtered), peer2) + require.Contains(t, itemsToPeers(filtered), peer3) + + // Assert Min Heap Order + require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) + require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) + require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) +} + +func itemsToPeers(is []Item) []peer.ID { + peers := make([]peer.ID, 0, len(is)) + for _, i := range is { + peers = append(peers, i.Peer) + } + return peers +} + +func TestMaxHeap(t *testing.T) { + // create new + ph := New(true, cmp) + require.Zero(t, ph.Len()) + + // push all three peers + heap.Push(ph, &Item{Peer: peer1}) + heap.Push(ph, &Item{Peer: peer3}) + heap.Push(ph, &Item{Peer: peer2}) + + // Assert Max Heap Order + require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer) + require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer) + require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer) +} diff --git a/kpeerset/sorted_peerset.go b/kpeerset/sorted_peerset.go index fe6ec6611..f4eac9b60 100644 --- a/kpeerset/sorted_peerset.go +++ b/kpeerset/sorted_peerset.go @@ -2,128 +2,183 @@ package kpeerset import ( "container/heap" - "sort" + "math/big" "sync" "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" + ks "github.com/whyrusleeping/go-keyspace" ) -type SortablePeers interface { - sort.Interface - GetPeerID(i int) peer.ID -} +// SortedPeerset is a data-structure that maintains the queried & unqueried peers for a query +// based on their distance from the key. +// It's main use is to allow peer addition, removal & retrieval for the query as per the +// semantics described in the Kad DHT paper. +type SortedPeerset struct { + // the key being searched for + key ks.Key -func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset { - fromKey := ks.XORKeySpace.Key([]byte(from)) + // the K parameter in the Kad DHT paper + kvalue int - return &SortedPeerset{ - kvalue: kvalue, - from: fromKey, - heapTopKPeers: peerMetricHeap{direction: 1}, - heapRestOfPeers: peerMetricHeap{direction: -1}, - topKPeers: make(map[peer.ID]*peerMetricHeapItem), - restOfPeers: make(map[peer.ID]*peerMetricHeapItem), - queried: make(map[peer.ID]struct{}), - sortPeers: sortPeers, - } -} + // a maxHeap maintaining the K closest(Kademlia XOR distance) peers to the key. + // the topmost peer will be the peer furthest from the key in this heap. + heapKClosestPeers *peerheap.Heap -type SortedPeerset struct { - kvalue int + // a minHeap for for rest of the peers ordered by their distance from the key. + // the topmost peer will be the peer closest to the key in this heap. + heapRestOfPeers *peerheap.Heap - // from is the Key this PQ measures against - from ks.Key + // pointer to the item in the heap of K closest peers. + kClosestPeers map[peer.ID]*peerheap.Item - heapTopKPeers, heapRestOfPeers peerMetricHeap + // pointer to the item in the heap of the rest of peers. + restOfPeers map[peer.ID]*peerheap.Item - topKPeers, restOfPeers map[peer.ID]*peerMetricHeapItem - queried map[peer.ID]struct{} + // peers that have already been queried. + queried map[peer.ID]struct{} - sortPeers func([]IPeerMetric) SortablePeers + // the closest peer to the key that we have heard about + closestKnownPeer peer.ID + // the distance of the closest known peer from the key + dClosestKnownPeer *big.Int lock sync.Mutex } -// Add adds the peer to the set. It returns true if the peer was newly added to the topK peers. +// NewSortedPeerset creates and returns a new SortedPeerset. +func NewSortedPeerset(kvalue int, key string) *SortedPeerset { + compare := func(i1 peerheap.Item, i2 peerheap.Item) bool { + // distance of the first peer from the key + d1 := i1.Value.(*big.Int) + // distance of the second peer from the key + d2 := i2.Value.(*big.Int) + + // Is the first peer closer to the key than the second peer ? + return d1.Cmp(d2) == -1 + } + + return &SortedPeerset{ + key: ks.XORKeySpace.Key([]byte(key)), + kvalue: kvalue, + heapKClosestPeers: peerheap.New(true, compare), + heapRestOfPeers: peerheap.New(false, compare), + kClosestPeers: make(map[peer.ID]*peerheap.Item), + restOfPeers: make(map[peer.ID]*peerheap.Item), + queried: make(map[peer.ID]struct{}), + } +} + +// Add adds the peer to the SortedPeerset. +// +// If there are less than K peers in the K closest peers, we add the peer to +// the K closest peers. +// +// Otherwise, we do one of the following: +// 1. If this peer is closer to the key than the peer furthest from the key in the +// K closest peers, we move that furthest peer to the rest of peers and then +// add this peer to the K closest peers. +// 2. If this peer is further from the key than the peer furthest from the key in the +// K closest peers, we add it to the rest of peers. +// +// Returns true if the peer is closer to key than the closet peer we've heard about. func (ps *SortedPeerset) Add(p peer.ID) bool { ps.lock.Lock() defer ps.lock.Unlock() - if _, ok := ps.topKPeers[p]; ok { - return false - } - if _, ok := ps.restOfPeers[p]; ok { + // we've already added the peer + if ps.kClosestPeers[p] != nil || ps.restOfPeers[p] != nil { return false } - distance := ks.XORKeySpace.Key([]byte(p)).Distance(ps.from) - pm := &peerMetricHeapItem{ - IPeerMetric: peerMetric{ - peer: p, - metric: distance, - }, + // calculate the distance of the given peer from the key + distancePeer := ks.XORKeySpace.Key([]byte(p)).Distance(ps.key) + item := &peerheap.Item{Peer: p, Value: distancePeer} + + if ps.heapKClosestPeers.Len() < ps.kvalue { + // add the peer to the K closest peers if we have space + heap.Push(ps.heapKClosestPeers, item) + ps.kClosestPeers[p] = item + } else if top := ps.heapKClosestPeers.PeekTop(); distancePeer.Cmp(top.Value.(*big.Int)) == -1 { + // peer is closer to the key than the top peer in the heap of K closest peers + // which is basically the peer furthest from the key because the K closest peers + // are stored in a maxHeap ordered by the distance from the key. + + // remove the top peer from the K closest peers & add it to the rest of peers. + bumpedPeer := heap.Pop(ps.heapKClosestPeers).(*peerheap.Item) + delete(ps.kClosestPeers, bumpedPeer.Peer) + heap.Push(ps.heapRestOfPeers, bumpedPeer) + ps.restOfPeers[bumpedPeer.Peer] = bumpedPeer + + // add the peer p to the K closest peers + heap.Push(ps.heapKClosestPeers, item) + ps.kClosestPeers[p] = item + } else { + // add the peer to the rest of peers. + heap.Push(ps.heapRestOfPeers, item) + ps.restOfPeers[p] = item } - if ps.heapTopKPeers.Len() < ps.kvalue { - heap.Push(&ps.heapTopKPeers, pm) - ps.topKPeers[p] = pm + if ps.closestKnownPeer == "" || (distancePeer.Cmp(ps.dClosestKnownPeer) == -1) { + // given peer is closer to the key than the current closest known peer. + // So, let's update the closest known peer + ps.closestKnownPeer = p + ps.dClosestKnownPeer = distancePeer return true } - switch ps.heapTopKPeers.data[0].Metric().Cmp(distance) { - case -1: - heap.Push(&ps.heapRestOfPeers, pm) - ps.restOfPeers[p] = pm - return false - case 1: - bumpedPeer := heap.Pop(&ps.heapTopKPeers).(*peerMetricHeapItem) - delete(ps.topKPeers, bumpedPeer.Peer()) - - heap.Push(&ps.heapRestOfPeers, bumpedPeer) - ps.restOfPeers[bumpedPeer.Peer()] = bumpedPeer - - heap.Push(&ps.heapTopKPeers, pm) - ps.topKPeers[p] = pm - return true - default: - return false - } + return false } -func (ps *SortedPeerset) TopK() []peer.ID { +// UnqueriedFromKClosest returns the unqueried peers among the K closest peers AFTER +// sorting them in Ascending Order with the given comparator. +// It uses the `getValue` function to get the value with which to compare the peers for sorting +// and the `sortWith` function to compare two peerHeap items to determine the ordering between them. +func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distance *big.Int) interface{}, + sortWith peerheap.Comparator) []peer.ID { ps.lock.Lock() defer ps.lock.Unlock() - topK := make([]peer.ID, 0, len(ps.heapTopKPeers.data)) - for _, pm := range ps.heapTopKPeers.data { - topK = append(topK, pm.Peer()) + unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) + + // create a min-heap to sort the unqueried peer Items using the given comparator + ph := peerheap.New(false, sortWith) + for _, i := range unqueriedPeerItems { + p := i.Peer + d := i.Value.(*big.Int) + heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) + } + // now pop so we get them in sorted order + peers := make([]peer.ID, 0, ph.Len()) + for ph.Len() != 0 { + popped := heap.Pop(ph).(*peerheap.Item) + peers = append(peers, popped.Peer) } - return topK + return peers } -func (ps *SortedPeerset) KUnqueried() []peer.ID { +// LenUnqueriedFromKClosest returns the number of unqueried peers among +// the K closest peers. +func (ps *SortedPeerset) LenUnqueriedFromKClosest() int { ps.lock.Lock() defer ps.lock.Unlock() - topK := make([]IPeerMetric, 0, len(ps.heapTopKPeers.data)) - for _, pm := range ps.heapTopKPeers.data { - if _, ok := ps.queried[pm.Peer()]; !ok { - topK = append(topK, pm.IPeerMetric) - } - } + unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) - sortedPeers := ps.sortPeers(topK) - peers := make([]peer.ID, 0, sortedPeers.Len()) - for i := range topK { - p := sortedPeers.GetPeerID(i) - peers = append(peers, p) - } + return len(unqueriedPeerItems) +} - return peers +// caller is responsible for the locking +func (ps *SortedPeerset) isPeerItemQueried(i peerheap.Item) bool { + _, ok := ps.queried[i.Peer] + return !ok } +// MarkQueried marks the peer as queried. +// It should be called when we have successfully dialed to and gotten a response from the peer. func (ps *SortedPeerset) MarkQueried(p peer.ID) { ps.lock.Lock() defer ps.lock.Unlock() @@ -131,25 +186,51 @@ func (ps *SortedPeerset) MarkQueried(p peer.ID) { ps.queried[p] = struct{}{} } +// Remove removes the peer from the SortedPeerset. +// +// If the removed peer was among the K closest peers, we pop a peer from the heap of rest of peers +// and add it to the K closest peers to replace the removed peer. The peer added to the K closest peers in this way +// would be the peer that was closest to the key among the rest of peers since the rest of peers are in a +// minHeap ordered on the distance from the key. func (ps *SortedPeerset) Remove(p peer.ID) { ps.lock.Lock() defer ps.lock.Unlock() delete(ps.queried, p) - if item, ok := ps.topKPeers[p]; ok { - heap.Remove(&ps.heapTopKPeers, item.index) - delete(ps.topKPeers, p) - - if len(ps.heapRestOfPeers.data) > 0 { - upgrade := heap.Pop(&ps.heapRestOfPeers).(*peerMetricHeapItem) - delete(ps.restOfPeers, upgrade.Peer()) + if item, ok := ps.kClosestPeers[p]; ok { + // peer is among the K closest peers + + // remove it from the K closest peers + heap.Remove(ps.heapKClosestPeers, item.Index) + delete(ps.kClosestPeers, p) + // if this peer was the closest peer we knew, we need to find the new closest peer. + if ps.closestKnownPeer == p { + var minDistance *big.Int + var closest peer.ID + for _, i := range ps.kClosestPeers { + d := i.Value.(*big.Int) + if minDistance == nil || (d.Cmp(minDistance) == -1) { + minDistance = d + closest = i.Peer + } + } + ps.closestKnownPeer = closest + ps.dClosestKnownPeer = minDistance + } - heap.Push(&ps.heapTopKPeers, upgrade) - ps.topKPeers[upgrade.Peer()] = upgrade + // we now need to add a peer to the K closest peers from the rest of peers + // to make up for the peer that was just removed + if ps.heapRestOfPeers.Len() > 0 { + // pop a peer from the rest of peers & add it to the K closest peers + upgrade := heap.Pop(ps.heapRestOfPeers).(*peerheap.Item) + delete(ps.restOfPeers, upgrade.Peer) + heap.Push(ps.heapKClosestPeers, upgrade) + ps.kClosestPeers[upgrade.Peer] = upgrade } } else if item, ok := ps.restOfPeers[p]; ok { - heap.Remove(&ps.heapRestOfPeers, item.index) + // peer is not among the K closest, so remove it from the rest of peers. + heap.Remove(ps.heapRestOfPeers, item.Index) delete(ps.restOfPeers, p) } } diff --git a/kpeerset/sorted_peerset_test.go b/kpeerset/sorted_peerset_test.go new file mode 100644 index 000000000..a496dade0 --- /dev/null +++ b/kpeerset/sorted_peerset_test.go @@ -0,0 +1,169 @@ +package kpeerset + +import ( + "math/big" + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" + kb "github.com/libp2p/go-libp2p-kbucket" + + "github.com/stretchr/testify/require" +) + +var noopCompare = func(i1 peerheap.Item, i2 peerheap.Item) bool { + return true +} + +var noopGetValue = func(p peer.ID, d *big.Int) interface{} { + return d +} + +func TestSortedPeerset(t *testing.T) { + key := "test" + sp := NewSortedPeerset(2, key) + require.Empty(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)) + + // -----------------Ordering between peers for the Test ----- + // KEY < peer0 < peer3 < peer1 < peer4 < peer2 < peer5 + // ---------------------------------------------------------- + peer2 := test.RandPeerIDFatal(t) + + // add peer 2 & assert + require.True(t, sp.Add(peer2)) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) + require.True(t, sp.LenUnqueriedFromKClosest() == 1) + require.Equal(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)[0], peer2) + assertClosestKnownPeer(t, sp, peer2) + + // add peer4 & assert + var peer4 peer.ID + for { + peer4 = test.RandPeerIDFatal(t) + if kb.Closer(peer4, peer2, key) { + break + } + } + require.True(t, sp.Add(peer4)) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) + require.True(t, sp.LenUnqueriedFromKClosest() == 2) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) + assertClosestKnownPeer(t, sp, peer4) + + // add peer1 which will displace peer2 in the kClosest + var peer1 peer.ID + for { + peer1 = test.RandPeerIDFatal(t) + if kb.Closer(peer1, peer4, key) { + break + } + } + require.True(t, sp.Add(peer1)) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) + assertClosestKnownPeer(t, sp, peer1) + + // add peer 3 which will displace peer4 in the kClosest + var peer3 peer.ID + for { + peer3 = test.RandPeerIDFatal(t) + if kb.Closer(peer3, peer1, key) { + break + } + } + require.True(t, sp.Add(peer3)) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) + assertClosestKnownPeer(t, sp, peer3) + + // removing peer1 moves peer4 to the KClosest + sp.Remove(peer1) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4) + sp.lock.Lock() + require.True(t, sp.heapRestOfPeers.Len() == 1) + require.Contains(t, sp.heapRestOfPeers.Peers(), peer2) + sp.lock.Unlock() + + // mark a peer as queried so it's not returned as unqueried + sp.MarkQueried(peer4) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3) + + // removing peer3 moves peer2 to the kClosest & updates the closest known peer to peer4 + sp.Remove(peer3) + require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1) + require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2) + sp.lock.Lock() + require.Empty(t, sp.heapRestOfPeers.Peers()) + sp.lock.Unlock() + assertClosestKnownPeer(t, sp, peer4) + + // adding peer5 does not change the closest known peer + var peer5 peer.ID + for { + peer5 = test.RandPeerIDFatal(t) + if kb.Closer(peer2, peer5, key) { + break + } + } + require.False(t, sp.Add(peer5)) + assertClosestKnownPeer(t, sp, peer4) + + // adding peer0 changes the closest known peer + var peer0 peer.ID + for { + peer0 = test.RandPeerIDFatal(t) + if kb.Closer(peer0, peer3, key) { + break + } + } + require.True(t, sp.Add(peer0)) + assertClosestKnownPeer(t, sp, peer0) +} + +func TestSortingUnqueriedFromKClosest(t *testing.T) { + p1 := peer.ID("1") + p2 := peer.ID("22") + p3 := peer.ID("333") + + key := "test" + sp := NewSortedPeerset(3, key) + sp.Add(p1) + sp.Add(p3) + sp.Add(p2) + + ps := sp.UnqueriedFromKClosest(noopGetValue, func(i1 peerheap.Item, i2 peerheap.Item) bool { + return len(i1.Peer) > len(i2.Peer) + }) + require.Len(t, ps, 3) + require.Equal(t, p3, ps[0]) + require.Equal(t, p2, ps[1]) + require.Equal(t, p1, ps[2]) + + // mark one as queried + scoref := func(p peer.ID, d *big.Int) interface{} { + return len(p) + } + + sp.MarkQueried(p3) + ps = sp.UnqueriedFromKClosest(scoref, func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(int) > i2.Value.(int) + }) + require.Len(t, ps, 2) + require.Equal(t, p2, ps[0]) + require.Equal(t, p1, ps[1]) +} + +func assertClosestKnownPeer(t *testing.T, sp *SortedPeerset, p peer.ID) { + sp.lock.Lock() + defer sp.lock.Unlock() + + require.Equal(t, sp.closestKnownPeer, p) +} diff --git a/query.go b/query.go index 2b4b21507..ae87ed197 100644 --- a/query.go +++ b/query.go @@ -5,11 +5,14 @@ import ( "errors" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" + "math/big" + "time" + "github.com/libp2p/go-libp2p-kad-dht/kpeerset" kb "github.com/libp2p/go-libp2p-kbucket" - - pstore "github.com/libp2p/go-libp2p-core/peerstore" ) // ErrNoPeersQueried is returned when we failed to connect to any peers. @@ -18,16 +21,26 @@ var ErrNoPeersQueried = errors.New("failed to query any peers") type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) type stopFn func(*kpeerset.SortedPeerset) bool +// query represents a single disjoint query. type query struct { - ctx context.Context + // the query context. + ctx context.Context + // the cancellation function for the query context. cancel context.CancelFunc dht *IpfsDHT - localPeers *kpeerset.SortedPeerset + // localPeers is the set of peers that need to be queried or have already been queried for this query. + localPeers *kpeerset.SortedPeerset + + // globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries. globallyQueriedPeers *peer.Set - queryFn queryFn - stopFn stopFn + + // the function that will be used to query a single peer. + queryFn queryFn + + // stopFn is used to determine if we should stop the WHOLE disjoint query. + stopFn stopFn } func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) { @@ -36,6 +49,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string numQueriesComplete := 0 queryDone := make(chan struct{}, d) + // pick the K closest peers to the key in our Routing table and shuffle them. seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize) if len(seedPeers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -49,15 +63,15 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i] }) + // create "d" disjoint queries queries := make([]*query, d) - peersQueried := peer.NewSet() for i := 0; i < d; i++ { query := &query{ ctx: queryCtx, cancel: cancelQuery, dht: dht, - localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target, dht.sortPeers), + localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target), globallyQueriedPeers: peersQueried, queryFn: queryFn, stopFn: stopFn, @@ -66,10 +80,12 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string queries[i] = query } + // distribute the shuffled K closest peers as seeds among the "d" disjoint queries for i := 0; i < len(seedPeers); i++ { queries[i%d].localPeers.Add(seedPeers[i]) } + // start the "d" disjoint queries for i := 0; i < d; i++ { query := queries[i] go func() { @@ -79,6 +95,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string } loop: + // wait for all the "d" disjoint queries to complete before we return for { select { case <-queryDone: @@ -94,47 +111,86 @@ loop: return queries, nil } -func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers { - return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore) +// TODO This function should be owned by the DHT as it dosen't really belong to "a query". +// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key +// and it's current known latency. +func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} { + connectedness := q.dht.host.Network().Connectedness(p) + latency := q.dht.host.Peerstore().LatencyEWMA(p) + + var c int64 + switch connectedness { + case network.Connected: + c = 1 + case network.CanConnect: + c = 5 + case network.CannotConnect: + c = 10000 + default: + c = 20 + } + + l := int64(latency) + if l <= 0 { + l = int64(time.Second) * 10 + } + + res := big.NewInt(c) + res.Mul(res, big.NewInt(l)) + res.Mul(res, distanceFromKey) + + return res } +// strictParallelismQuery concurrently sends the query RPC to all eligible peers +// and waits for ALL the RPC's to complete before starting the next round of RPC's. func strictParallelismQuery(q *query) { - /* - start with K closest peers (some queried already some not) - take best alpha (sorted by some metric) - query those alpha - once they complete: - if the alpha requests did not add any new peers to top K, repeat with unqueried top K - else repeat - */ - foundCloser := false for { - peersToQuery := q.localPeers.KUnqueried() - + // get the unqueried peers from among the K closest peers to the key sorted in ascending order + // of their 'distance-latency` score. + // We sort peers like this so that "better" peers are chosen to be in the α peers + // which get queried from among the unqueried K closet. + peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency, + func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 + }) + + // The lookup terminates when the initiator has queried and gotten responses from the k + // closest nodes it has heard about. if len(peersToQuery) == 0 { return } - // TODO: Is it finding a closer peer if it's closer than one we know about or one we have queried? + // Of the k nodes the initiator has heard of closest to the target, + // it picks α that it has not yet queried and resends the FIND NODE RPC to them. numQuery := q.dht.alpha - if foundCloser { + + // However, If a round of RPC's fails to return a node any closer than the closest already heard about, + // the initiator resends the RPC'S to all of the k closest nodes it has + // not already queried. + if !foundCloser { numQuery = len(peersToQuery) } else if pqLen := len(peersToQuery); pqLen < numQuery { + // if we don't have α peers, pick whatever number we have. numQuery = pqLen } + + // reset foundCloser to false for the next round of RPC's foundCloser = false queryResCh := make(chan *queryResult, numQuery) resultsReceived := 0 + // send RPC's to all the chosen peers concurrently for _, p := range peersToQuery[:numQuery] { go func(p peer.ID) { - queryResCh <- q.queryPeer(q.ctx, p) + queryResCh <- q.queryPeer(p) }(p) } loop: + // wait for all outstanding RPC's to complete before we start the next round. for { select { case res := <-queryResCh: @@ -150,153 +206,50 @@ func strictParallelismQuery(q *query) { } } -func simpleQuery(q *query) { - /* - start with K closest peers (some queried already some not) - take best alpha (sorted by some metric) - query those alpha - - if a query fails then take the next one - once they complete: - if the alpha requests did not add any new peers to top K, repeat with unqueried top K - else repeat - */ - - var lastPeers []peer.ID - for { - peersToQuery := q.localPeers.KUnqueried() - - if len(peersToQuery) == 0 { - return - } - - numQuery := q.dht.alpha - if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) { - numQuery = len(peersToQuery) - } else if pqLen := len(peersToQuery); pqLen < numQuery { - numQuery = pqLen - } - - peersToQueryCh := make(chan peer.ID, numQuery) - for _, p := range peersToQuery[:numQuery] { - peersToQueryCh <- p - } - queryResCh := make(chan *queryResult, numQuery) - queriesSucceeded, queriesSent := 0, numQuery - - dialPeers: - for { - select { - case p := <-peersToQueryCh: - go func() { - queryResCh <- q.queryPeer(q.ctx, p) - }() - case res := <-queryResCh: - if res.success { - queriesSucceeded++ - if queriesSucceeded == numQuery { - break dialPeers - } - } else { - queriesSent++ - if queriesSent >= len(peersToQuery) { - break dialPeers - } - peersToQueryCh <- peersToQuery[queriesSent] - } - case <-q.ctx.Done(): - return - } - } - } -} - -func boundedDialQuery(q *query) { - /* - start with K closest peers (some queried already some not) - take best alpha (sorted by some metric) - query those alpha - -- if queried peer falls out of top K we've heard of + top alpha we've received responses from - + others like percentage of way through the timeout, their reputation, etc. - 1) Cancel dial 2) Cancel query but not dial 3) Continue with query - */ - - var lastPeers []peer.ID - for { - peersToQuery := q.localPeers.KUnqueried() - - if len(peersToQuery) == 0 { - return - } - - numQuery := q.dht.alpha - if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) { - numQuery = len(peersToQuery) - } - - peersToQueryCh := make(chan peer.ID, numQuery) - for _, p := range peersToQuery[:numQuery] { - peersToQueryCh <- p - } - queryResCh := make(chan *queryResult, numQuery) - queriesSucceeded, queriesSent := 0, 0 - - for { - select { - case p := <-peersToQueryCh: - go func() { - queryResCh <- q.queryPeer(q.ctx, p) - }() - case res := <-queryResCh: - if res.success { - queriesSucceeded++ - } else { - queriesSent++ - if queriesSent >= len(peersToQuery) { - return - } - peersToQueryCh <- peersToQuery[queriesSent] - } - case <-q.ctx.Done(): - return - } - } - } -} - type queryResult struct { - success bool + // foundCloserPeer is true if the peer we're querying returns a peer + // closer than the closest we've already heard about foundCloserPeer bool } -func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult { - dialCtx, queryCtx := ctx, ctx +// queryPeer queries a single peer. +func (q *query) queryPeer(p peer.ID) *queryResult { + dialCtx, queryCtx := q.ctx, q.ctx + // dial the peer if err := q.dht.dialPeer(dialCtx, p); err != nil { q.localPeers.Remove(p) return &queryResult{} } + + // add the peer to the global set of queried peers since the dial was successful + // so that no other disjoint query tries sending an RPC to the same peer if !q.globallyQueriedPeers.TryAdd(p) { q.localPeers.Remove(p) return &queryResult{} } + // did the dial fulfill the stop condition ? if q.stopFn(q.localPeers) { q.cancel() return &queryResult{} } + // send query RPC to the remote peer newPeers, err := q.queryFn(queryCtx, p) if err != nil { q.localPeers.Remove(p) return &queryResult{} } + // mark the peer as queried. q.localPeers.MarkQueried(p) if len(newPeers) == 0 { logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p) } + foundCloserPeer := false for _, next := range newPeers { if next.ID == q.dht.self { // don't add self. logger.Debugf("PEERS CLOSER -- worker for: %v found self", p) @@ -305,19 +258,16 @@ func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult { // add their addresses to the dialer's peerstore q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) - } - - foundCloserPeer := false - for _, np := range newPeers { - closer := q.localPeers.Add(np.ID) + closer := q.localPeers.Add(next.ID) foundCloserPeer = foundCloserPeer || closer } + // did the successful query RPC fulfill the query stop condition ? if q.stopFn(q.localPeers) { q.cancel() } + return &queryResult{ - success: true, foundCloserPeer: foundCloserPeer, } } @@ -348,17 +298,3 @@ func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error { logger.Debugf("connected. dial success.") return nil } - -// Equal tells whether a and b contain the same elements. -// A nil argument is equivalent to an empty slice. -func peerSlicesEqual(a, b []peer.ID) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} diff --git a/routing.go b/routing.go index ac2361d54..10f27499b 100644 --- a/routing.go +++ b/routing.go @@ -335,7 +335,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st } go func() { - queries, _ := dht.runDisjointQueries(ctx, dht.d, key, + defer close(valCh) + defer close(queriesCh) + queries, err := dht.runDisjointQueries(ctx, dht.d, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -394,28 +396,34 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st }, ) - close(valCh) - queriesCh <- queries - close(queriesCh) - - shortcutTaken := false - for _, q := range queries { - if len(q.localPeers.KUnqueried()) > 0 { - shortcutTaken = true - break - } + if err != nil { + return } + queriesCh <- queries - if !shortcutTaken && ctx.Err() == nil { - kadID := kb.ConvertKey(key) - // refresh the cpl for this key as the query was successful - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + if ctx.Err() == nil { + dht.refreshRTIfNoShortcut(kb.ConvertKey(key), queries) } }() return valCh, queriesCh } +func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { + shortcutTaken := false + for _, q := range queries { + if q.localPeers.LenUnqueriedFromKClosest() > 0 { + shortcutTaken = true + break + } + } + + if !shortcutTaken { + // refresh the cpl for this key as the query was successful + dht.routingTable.ResetCplRefreshedAtForID(key, time.Now()) + } +} + // Provider abstraction for indirect stores. // Some DHTs store values directly, while an indirect store stores pointers to // locations of the value, similarly to Coral and Mainline DHT. @@ -570,7 +578,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } - _, _ = dht.runDisjointQueries(ctx, dht.d, string(key), + queries, err := dht.runDisjointQueries(ctx, dht.d, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -626,9 +634,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash }, ) - if ctx.Err() == nil { - // refresh the cpl for this key after the query is run - dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now()) + if err != nil && ctx.Err() == nil { + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), queries) } } @@ -680,30 +687,16 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } + // refresh the cpl for this key if we discovered the peer because of the query + if ctx.Err() == nil && queries[0].globallyQueriedPeers.Contains(id) { + kadID := kb.ConvertPeerID(id) + dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) + } + // TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar if dht.host.Network().Connectedness(id) == network.Connected { - shortcutTaken := false - for _, q := range queries { - if len(q.localPeers.KUnqueried()) > 0 { - shortcutTaken = true - break - } - } - - if !shortcutTaken { - kadID := kb.ConvertPeerID(id) - // refresh the cpl for this key as the query was successful - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) - } - return dht.peerstore.PeerInfo(id), nil - } else { - if ctx.Err() == nil { - kadID := kb.ConvertPeerID(id) - // refresh the cpl for this key as the query was successful - dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now()) - } - - return peer.AddrInfo{}, routing.ErrNotFound } + + return peer.AddrInfo{}, routing.ErrNotFound }