diff --git a/go.mod b/go.mod index 90c5aa4..79a6ad7 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,6 @@ require ( github.com/multiformats/go-base32 v0.0.3 github.com/multiformats/go-multiaddr v0.3.3 github.com/multiformats/go-multiaddr-fmt v0.1.0 - github.com/multiformats/go-multihash v0.0.15 github.com/stretchr/testify v1.7.0 - github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 go.uber.org/goleak v1.1.10 ) diff --git a/go.sum b/go.sum index d384bcd..21a3bd5 100644 --- a/go.sum +++ b/go.sum @@ -85,8 +85,6 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= -github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw= -github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -106,9 +104,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= -github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= -github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= @@ -130,9 +127,8 @@ github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77 github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I= github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= -github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM= -github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= @@ -170,8 +166,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= -github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -194,9 +188,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= -golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -231,11 +224,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= -golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= diff --git a/queue/distance.go b/queue/distance.go deleted file mode 100644 index d7e7989..0000000 --- a/queue/distance.go +++ /dev/null @@ -1,100 +0,0 @@ -package queue - -import ( - "container/heap" - "math/big" - "sync" - - "github.com/libp2p/go-libp2p-core/peer" - ks "github.com/whyrusleeping/go-keyspace" -) - -// peerMetric tracks a peer and its distance to something else. -type peerMetric struct { - // the peer - peer peer.ID - - // big.Int for XOR metric - metric *big.Int -} - -// peerMetricHeap implements a heap of peerDistances -type peerMetricHeap []*peerMetric - -func (ph peerMetricHeap) Len() int { - return len(ph) -} - -func (ph peerMetricHeap) Less(i, j int) bool { - return -1 == ph[i].metric.Cmp(ph[j].metric) -} - -func (ph peerMetricHeap) Swap(i, j int) { - ph[i], ph[j] = ph[j], ph[i] -} - -func (ph *peerMetricHeap) Push(x interface{}) { - item := x.(*peerMetric) - *ph = append(*ph, item) -} - -func (ph *peerMetricHeap) Pop() interface{} { - old := *ph - n := len(old) - item := old[n-1] - *ph = old[0 : n-1] - return item -} - -// distancePQ implements heap.Interface and PeerQueue -type distancePQ struct { - // from is the Key this PQ measures against - from ks.Key - - // heap is a heap of peerDistance items - heap peerMetricHeap - - sync.RWMutex -} - -func (pq *distancePQ) Len() int { - pq.Lock() - defer pq.Unlock() - return len(pq.heap) -} - -func (pq *distancePQ) Enqueue(p peer.ID) { - pq.Lock() - defer pq.Unlock() - - distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from) - - heap.Push(&pq.heap, &peerMetric{ - peer: p, - metric: distance, - }) -} - -func (pq *distancePQ) Dequeue() peer.ID { - pq.Lock() - defer pq.Unlock() - - if len(pq.heap) < 1 { - panic("called Dequeue on an empty PeerQueue") - // will panic internally anyway, but we can help debug here - } - - o := heap.Pop(&pq.heap) - p := o.(*peerMetric) - return p.peer -} - -// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted -// in terms of their distances to each other in an XORKeySpace (i.e. using -// XOR as a metric of distance). -func NewXORDistancePQ(from string) PeerQueue { - return &distancePQ{ - from: ks.XORKeySpace.Key([]byte(from)), - heap: peerMetricHeap{}, - } -} diff --git a/queue/interface.go b/queue/interface.go deleted file mode 100644 index 7cdea71..0000000 --- a/queue/interface.go +++ /dev/null @@ -1,18 +0,0 @@ -package queue - -import "github.com/libp2p/go-libp2p-core/peer" - -// PeerQueue maintains a set of peers ordered according to a metric. -// Implementations of PeerQueue could order peers based on distances along -// a KeySpace, latency measurements, trustworthiness, reputation, etc. -type PeerQueue interface { - - // Len returns the number of items in PeerQueue - Len() int - - // Enqueue adds this node to the queue. - Enqueue(peer.ID) - - // Dequeue retrieves the highest (smallest int) priority node - Dequeue() peer.ID -} diff --git a/queue/queue_test.go b/queue/queue_test.go deleted file mode 100644 index 0c9afc7..0000000 --- a/queue/queue_test.go +++ /dev/null @@ -1,139 +0,0 @@ -package queue - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - "github.com/libp2p/go-libp2p-core/peer" - mh "github.com/multiformats/go-multihash" -) - -func TestQueue(t *testing.T) { - - p1 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode. - p2 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") // these aren't valid, because need to hex-decode. - p3 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") // these aren't valid, because need to hex-decode. - p4 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") // these aren't valid, because need to hex-decode. - p5 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode. - // but they work. - - // these are the peer.IDs' XORKeySpace Key values: - // [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40] - // [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6] - // [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246] - // [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51] - - pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") - pq.Enqueue(p3) - pq.Enqueue(p1) - pq.Enqueue(p2) - pq.Enqueue(p4) - pq.Enqueue(p5) - pq.Enqueue(p1) - - // should come out as: p1, p4, p3, p2 - - if d := pq.Dequeue(); d != p1 && d != p5 { - t.Error("ordering failed") - } - - if d := pq.Dequeue(); d != p1 && d != p5 { - t.Error("ordering failed") - } - - if d := pq.Dequeue(); d != p1 && d != p5 { - t.Error("ordering failed") - } - - if pq.Dequeue() != p4 { - t.Error("ordering failed") - } - - if pq.Dequeue() != p3 { - t.Error("ordering failed") - } - - if pq.Dequeue() != p2 { - t.Error("ordering failed") - } - -} - -func newPeerTime(t time.Time) peer.ID { - s := fmt.Sprintf("hmmm time: %v", t) - h, _ := mh.Sum([]byte(s), mh.SHA2_256, -1) - return peer.ID(h) -} - -func TestSyncQueue(t *testing.T) { - tickT := time.Microsecond * 50 - max := 5000 - consumerN := 10 - countsIn := make([]int, consumerN*2) - countsOut := make([]int, consumerN) - - if testing.Short() { - max = 1000 - } - - ctx := context.Background() - pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") - cq := NewChanQueue(ctx, pq) - wg := sync.WaitGroup{} - - produce := func(p int) { - defer wg.Done() - - tick := time.Tick(tickT) - for i := 0; i < max; i++ { - select { - case tim := <-tick: - countsIn[p]++ - cq.EnqChan <- newPeerTime(tim) - case <-ctx.Done(): - return - } - } - } - - consume := func(c int) { - defer wg.Done() - - for { - select { - case <-cq.DeqChan: - countsOut[c]++ - if countsOut[c] >= max*2 { - return - } - case <-ctx.Done(): - return - } - } - } - - // make n * 2 producers and n consumers - for i := 0; i < consumerN; i++ { - wg.Add(3) - go produce(i) - go produce(consumerN + i) - go consume(i) - } - - wg.Wait() - - sum := func(ns []int) int { - total := 0 - for _, n := range ns { - total += n - } - return total - } - - if sum(countsIn) != sum(countsOut) { - t.Errorf("didnt get all of them out: %d/%d", sum(countsOut), sum(countsIn)) - } -} diff --git a/queue/sync.go b/queue/sync.go deleted file mode 100644 index a17a759..0000000 --- a/queue/sync.go +++ /dev/null @@ -1,85 +0,0 @@ -package queue - -import ( - "context" - - logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/peer" -) - -var log = logging.Logger("peerqueue") - -// ChanQueue makes any PeerQueue synchronizable through channels. -type ChanQueue struct { - Queue PeerQueue - EnqChan chan<- peer.ID - DeqChan <-chan peer.ID -} - -// NewChanQueue creates a ChanQueue by wrapping pq. -func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { - cq := &ChanQueue{Queue: pq} - cq.process(ctx) - return cq -} - -func (cq *ChanQueue) process(ctx context.Context) { - // construct the channels here to be able to use them bidirectionally - enqChan := make(chan peer.ID) - deqChan := make(chan peer.ID) - - cq.EnqChan = enqChan - cq.DeqChan = deqChan - - go func() { - log.Debug("processing") - defer log.Debug("closed") - defer close(deqChan) - - var next peer.ID - var item peer.ID - var more bool - - for { - if cq.Queue.Len() == 0 { - // log.Debug("wait for enqueue") - select { - case next, more = <-enqChan: - if !more { - return - } - // log.Debug("got", next) - - case <-ctx.Done(): - return - } - - } else { - next = cq.Queue.Dequeue() - // log.Debug("peek", next) - } - - select { - case item, more = <-enqChan: - if !more { - if cq.Queue.Len() > 0 { - return // we're done done. - } - enqChan = nil // closed, so no use. - } - // log.Debug("got", item) - cq.Queue.Enqueue(item) - cq.Queue.Enqueue(next) // order may have changed. - next = "" - - case deqChan <- next: - // log.Debug("dequeued", next) - next = "" - - case <-ctx.Done(): - return - } - } - - }() -}