From b73106eba2d559621f427f7aa788e9b0ef47d135 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 25 Nov 2020 18:50:23 +0100 Subject: [PATCH] feat: discover and connect to closest peers (#798) --- doc/API.md | 32 ++++ doc/CONFIGURATION.md | 9 +- package.json | 2 + src/config.js | 7 + src/index.js | 8 +- src/peer-routing.js | 132 +++++++++++--- test/peer-routing/peer-routing.node.js | 238 +++++++++++++++++++++++++ 7 files changed, 401 insertions(+), 27 deletions(-) diff --git a/doc/API.md b/doc/API.md index b199d55687..222ef660eb 100644 --- a/doc/API.md +++ b/doc/API.md @@ -20,6 +20,7 @@ * [`contentRouting.get`](#contentroutingget) * [`contentRouting.getMany`](#contentroutinggetmany) * [`peerRouting.findPeer`](#peerroutingfindpeer) + * [`peerRouting.getClosestPeers`](#peerroutinggetclosestpeers) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) * [`peerStore.addressBook.get`](#peerstoreaddressbookget) @@ -100,6 +101,7 @@ Creates an instance of Libp2p. | [options.keychain] | [`object`](./CONFIGURATION.md#setup-with-keychain) | keychain [configuration](./CONFIGURATION.md#setup-with-keychain) | | [options.metrics] | [`object`](./CONFIGURATION.md#configuring-metrics) | libp2p Metrics [configuration](./CONFIGURATION.md#configuring-metrics) | | [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) | +| [options.peerRouting] | [`object`](./CONFIGURATION.md#setup-with-content-and-peer-routing) | libp2p Peer routing service [configuration](./CONFIGURATION.md#setup-with-content-and-peer-routing) | | [options.peerStore] | [`object`](./CONFIGURATION.md#configuring-peerstore) | libp2p PeerStore [configuration](./CONFIGURATION.md#configuring-peerstore) | For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md). @@ -707,6 +709,36 @@ Iterates over all peer routers in series to find the given peer. If the DHT is e const peer = await libp2p.peerRouting.findPeer(peerId, options) ``` +### peerRouting.getClosestPeers + +Iterates over all content routers in series to get the closest peers of the given key. +Once a content router succeeds, the iteration will stop. If the DHT is enabled, it will be queried first. + +`libp2p.peerRouting.getClosestPeers(cid, options)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| key | `Uint8Array` | A CID like key | +| options | `object` | operation options | +| options.timeout | `number` | How long the query can take (ms). | + +#### Returns + +| Type | Description | +|------|-------------| +| `AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }` | Async iterator for peer data | + +#### Example + +```js +// Iterate over the closest peers found for the given key +for await (const peer of libp2p.peerRouting.getClosestPeers(key)) { + console.log(peer.id, peer.multiaddrs) +} +``` + ### peerStore.addressBook.add Adds known `multiaddrs` of a given peer. If the peer is not known, it will be set with the provided multiaddrs. diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 917ec7cfb7..06cc8faed3 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -397,7 +397,14 @@ const node = await Libp2p.create({ new DelegatedPeerRouter() ], }, - peerId + peerId, + peerRouting: { // Peer routing configuration + refreshManager: { // Refresh known and connected closest peers + enabled: true, // Should find the closest peers. + interval: 6e5, // Interval for getting the new for closest peers of 10min + bootDelay: 10e3 // Delay for the initial query for closest peers + } + } }) ``` diff --git a/package.json b/package.json index fedbb1687f..e6fee11aad 100644 --- a/package.json +++ b/package.json @@ -79,6 +79,7 @@ "protons": "^2.0.0", "retimer": "^2.0.0", "sanitize-filename": "^1.6.3", + "set-delayed-interval": "^1.0.0", "streaming-iterables": "^5.0.2", "timeout-abort-controller": "^1.1.1", "varint": "^5.0.0", @@ -92,6 +93,7 @@ "chai-string": "^1.5.0", "delay": "^4.3.0", "interop-libp2p": "^0.3.0", + "into-stream": "^6.0.0", "ipfs-http-client": "^47.0.1", "it-concat": "^1.0.0", "it-pair": "^1.0.0", diff --git a/src/config.js b/src/config.js index 1134050c14..a2334f52e7 100644 --- a/src/config.js +++ b/src/config.js @@ -41,6 +41,13 @@ const DefaultConfig = { persistence: false, threshold: 5 }, + peerRouting: { + refreshManager: { + enabled: true, + interval: 6e5, + bootDelay: 10e3 + } + }, config: { dht: { enabled: false, diff --git a/src/index.js b/src/index.js index 23ecae3d19..d2e2a7cb67 100644 --- a/src/index.js +++ b/src/index.js @@ -9,7 +9,7 @@ log.error = debug('libp2p:error') const errCode = require('err-code') const PeerId = require('peer-id') -const peerRouting = require('./peer-routing') +const PeerRouting = require('./peer-routing') const contentRouting = require('./content-routing') const getPeer = require('./get-peer') const { validate: validateConfig } = require('./config') @@ -193,7 +193,7 @@ class Libp2p extends EventEmitter { // Attach remaining APIs // peer and content routing will automatically get modules from _modules and _dht - this.peerRouting = peerRouting(this) + this.peerRouting = new PeerRouting(this) this.contentRouting = contentRouting(this) // Mount default protocols @@ -250,8 +250,8 @@ class Libp2p extends EventEmitter { try { this._isStarted = false - // Relay this.relay && this.relay.stop() + this.peerRouting.stop() for (const service of this._discovery.values()) { service.removeListener('peer', this._onDiscoveryPeer) @@ -501,6 +501,8 @@ class Libp2p extends EventEmitter { // Relay this.relay && this.relay.start() + + this.peerRouting.start() } /** diff --git a/src/peer-routing.js b/src/peer-routing.js index 7594f15d77..e783c82f8b 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -1,40 +1,126 @@ 'use strict' const errCode = require('err-code') +const debug = require('debug') +const log = debug('libp2p:peer-routing') +log.error = debug('libp2p:peer-routing:error') + +const all = require('it-all') const pAny = require('p-any') +const { + setDelayedInterval, + clearDelayedInterval +} = require('set-delayed-interval') + +/** + * Responsible for managing the usage of the available Peer Routing modules. + */ +class PeerRouting { + /** + * @class + * @param {Libp2p} libp2p + */ + constructor (libp2p) { + this._peerId = libp2p.peerId + this._peerStore = libp2p.peerStore + this._routers = libp2p._modules.peerRouting || [] + + // If we have the dht, make it first + if (libp2p._dht) { + this._routers.unshift(libp2p._dht) + } + + this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager + + this._findClosestPeersTask = this._findClosestPeersTask.bind(this) + } + + /** + * Start peer routing service. + */ + start () { + if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) { + return + } + + this._timeoutId = setDelayedInterval( + this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay + ) + } -module.exports = (node) => { - const routers = node._modules.peerRouting || [] + /** + * Recurrent task to find closest peers and add their addresses to the Address Book. + */ + async _findClosestPeersTask () { + try { + for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) { + this._peerStore.addressBook.add(id, multiaddrs) + } + } catch (err) { + log.error(err) + } + } - // If we have the dht, make it first - if (node._dht) { - routers.unshift(node._dht) + /** + * Stop peer routing service. + */ + stop () { + clearDelayedInterval(this._timeoutId) } - return { - /** - * Iterates over all peer routers in series to find the given peer. - * - * @param {string} id - The id of the peer to find - * @param {object} [options] - * @param {number} [options.timeout] - How long the query should run - * @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} - */ - findPeer: async (id, options) => { // eslint-disable-line require-await - if (!routers.length) { - throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') + /** + * Iterates over all peer routers in series to find the given peer. + * + * @param {string} id - The id of the peer to find + * @param {object} [options] + * @param {number} [options.timeout] - How long the query should run + * @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} + */ + async findPeer (id, options) { // eslint-disable-line require-await + if (!this._routers.length) { + throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') + } + + return pAny(this._routers.map(async (router) => { + const result = await router.findPeer(id, options) + + // If we don't have a result, we need to provide an error to keep trying + if (!result || Object.keys(result).length === 0) { + throw errCode(new Error('not found'), 'NOT_FOUND') } - return pAny(routers.map(async (router) => { - const result = await router.findPeer(id, options) + return result + })) + } + + /** + * Attempt to find the closest peers on the network to the given key. + * + * @param {Uint8Array} key - A CID like key + * @param {Object} [options] + * @param {number} [options.timeout=30e3] - How long the query can take. + * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} + */ + async * getClosestPeers (key, options = { timeout: 30e3 }) { + if (!this._routers.length) { + throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') + } - // If we don't have a result, we need to provide an error to keep trying - if (!result || Object.keys(result).length === 0) { + const result = await pAny( + this._routers.map(async (router) => { + const peers = await all(router.getClosestPeers(key, options)) + + if (!peers || !peers.length) { throw errCode(new Error('not found'), 'NOT_FOUND') } + return peers + }) + ) - return result - })) + for (const peer of result) { + yield peer } } } + +module.exports = PeerRouting diff --git a/test/peer-routing/peer-routing.node.js b/test/peer-routing/peer-routing.node.js index cdce080c1e..74cc6393e6 100644 --- a/test/peer-routing/peer-routing.node.js +++ b/test/peer-routing/peer-routing.node.js @@ -4,12 +4,17 @@ const { expect } = require('aegir/utils/chai') const nock = require('nock') const sinon = require('sinon') +const intoStream = require('into-stream') +const delay = require('delay') const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') const mergeOptions = require('merge-options') const ipfsHttpClient = require('ipfs-http-client') const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') const peerUtils = require('../utils/creators/peer') const { baseOptions, routingOptions } = require('./utils') @@ -29,6 +34,16 @@ describe('peer-routing', () => { .to.eventually.be.rejected() .and.to.have.property('code', 'NO_ROUTERS_AVAILABLE') }) + + it('.getClosestPeers should return an error', async () => { + try { + for await (const _ of node.peerRouting.getClosestPeers('a cid')) { } // eslint-disable-line + throw new Error('.getClosestPeers should return an error') + } catch (err) { + expect(err).to.exist() + expect(err.code).to.equal('NO_ROUTERS_AVAILABLE') + } + }) }) describe('via dht router', () => { @@ -64,6 +79,19 @@ describe('peer-routing', () => { nodes[0].peerRouting.findPeer() return deferred.promise }) + + it('should use the nodes dht to get the closest peers', async () => { + const deferred = pDefer() + + sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(function * () { + deferred.resolve() + yield + }) + + await nodes[0].peerRouting.getClosestPeers().next() + + return deferred.promise + }) }) describe('via delegate router', () => { @@ -110,6 +138,19 @@ describe('peer-routing', () => { return deferred.promise }) + it('should use the delegate router to get the closest peers', async () => { + const deferred = pDefer() + + sinon.stub(delegate, 'getClosestPeers').callsFake(function * () { + deferred.resolve() + yield + }) + + await node.peerRouting.getClosestPeers().next() + + return deferred.promise + }) + it('should be able to find a peer', async () => { const peerKey = 'QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL' const mockApi = nock('http://0.0.0.0:60197') @@ -154,6 +195,60 @@ describe('peer-routing', () => { expect(mockApi.isDone()).to.equal(true) }) + + it('should be able to get the closest peers', async () => { + const peerId = await PeerId.create({ keyType: 'ed25519' }) + + const closest1 = '12D3KooWLewYMMdGWAtuX852n4rgCWkK7EBn4CWbwwBzhsVoKxk3' + const closest2 = '12D3KooWDtoQbpKhtnWddfj72QmpFvvLDTsBLTFkjvgQm6cde2AK' + + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/query') + .query(true) + .reply(200, + () => intoStream([ + `{"extra":"","id":"${closest1}","responses":[{"ID":"${closest1}","Addrs":["/ip4/127.0.0.1/tcp/63930","/ip4/127.0.0.1/tcp/63930"]}],"type":1}\n`, + `{"extra":"","id":"${closest2}","responses":[{"ID":"${closest2}","Addrs":["/ip4/127.0.0.1/tcp/63506","/ip4/127.0.0.1/tcp/63506"]}],"type":1}\n`, + `{"Extra":"","ID":"${closest2}","Responses":[],"Type":2}\n`, + `{"Extra":"","ID":"${closest1}","Responses":[],"Type":2}\n` + ]), + [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + const closestPeers = [] + for await (const peer of node.peerRouting.getClosestPeers(peerId.id, { timeout: 1000 })) { + closestPeers.push(peer) + } + + expect(closestPeers).to.have.length(2) + expect(closestPeers[0].id.toB58String()).to.equal(closest2) + expect(closestPeers[0].multiaddrs).to.have.lengthOf(2) + expect(closestPeers[1].id.toB58String()).to.equal(closest1) + expect(closestPeers[1].multiaddrs).to.have.lengthOf(2) + expect(mockApi.isDone()).to.equal(true) + }) + + it('should handle errors when getting the closest peers', async () => { + const peerId = await PeerId.create({ keyType: 'ed25519' }) + + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/query') + .query(true) + .reply(502, 'Bad Gateway', [ + 'X-Chunked-Output', '1' + ]) + + try { + for await (const _ of node.peerRouting.getClosestPeers(peerId.id)) { } // eslint-disable-line + throw new Error('should handle errors when getting the closest peers') + } catch (err) { + expect(err).to.exist() + } + + expect(mockApi.isDone()).to.equal(true) + }) }) describe('via dht and delegate routers', () => { @@ -208,5 +303,148 @@ describe('peer-routing', () => { const peer = await node.peerRouting.findPeer('a peer id') expect(peer).to.eql(results) }) + + it('should only use the dht if it gets the closest peers', async () => { + const results = [true] + + sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { + yield results[0] + }) + + sinon.stub(delegate, 'getClosestPeers').callsFake(function * () { // eslint-disable-line require-yield + throw new Error('the delegate should not have been called') + }) + + const closest = [] + for await (const peer of node.peerRouting.getClosestPeers('a cid')) { + closest.push(peer) + } + + expect(closest).to.have.length.above(0) + expect(closest).to.eql(results) + }) + + it('should use the delegate if the dht fails to get the closest peer', async () => { + const results = [true] + + sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { }) + + sinon.stub(delegate, 'getClosestPeers').callsFake(function * () { + yield results[0] + }) + + const closest = [] + for await (const peer of node.peerRouting.getClosestPeers('a cid')) { + closest.push(peer) + } + + expect(closest).to.have.length.above(0) + expect(closest).to.eql(results) + }) + }) + + describe('peer routing refresh manager service', () => { + let node + let peerIds + + before(async () => { + peerIds = await peerUtils.createPeerId({ number: 2 }) + }) + + afterEach(() => { + sinon.restore() + + return node && node.stop() + }) + + it('should be enabled and start by default', async () => { + const results = [ + { id: peerIds[0], multiaddrs: [multiaddr('/ip4/30.0.0.1/tcp/2000')] }, + { id: peerIds[1], multiaddrs: [multiaddr('/ip4/32.0.0.1/tcp/2000')] } + ] + + ;[node] = await peerUtils.createPeer({ + config: mergeOptions(routingOptions, { + peerRouting: { + refreshManager: { + bootDelay: 100 + } + } + }), + started: false + }) + + sinon.spy(node.peerStore.addressBook, 'add') + sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { + yield results[0] + yield results[1] + }) + + await node.start() + + await pWaitFor(() => node._dht.getClosestPeers.callCount === 1) + await pWaitFor(() => node.peerStore.addressBook.add.callCount === results.length) + + const call0 = node.peerStore.addressBook.add.getCall(0) + expect(call0.args[0].equals(results[0].id)) + call0.args[1].forEach((m, index) => { + expect(m.equals(results[0].multiaddrs[index])) + }) + + const call1 = node.peerStore.addressBook.add.getCall(1) + expect(call1.args[0].equals(results[1].id)) + call0.args[1].forEach((m, index) => { + expect(m.equals(results[1].multiaddrs[index])) + }) + }) + + it('should support being disabled', async () => { + [node] = await peerUtils.createPeer({ + config: mergeOptions(routingOptions, { + peerRouting: { + refreshManager: { + bootDelay: 100, + enabled: false + } + } + }), + started: false + }) + + sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { + yield + throw new Error('should not be called') + }) + + await node.start() + await delay(100) + + expect(node._dht.getClosestPeers.callCount === 0) + }) + + it('should start and run recurrently on interval', async () => { + [node] = await peerUtils.createPeer({ + config: mergeOptions(routingOptions, { + peerRouting: { + refreshManager: { + interval: 500, + bootDelay: 200 + } + } + }), + started: false + }) + + sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { + yield { id: peerIds[0], multiaddrs: [multiaddr('/ip4/30.0.0.1/tcp/2000')] } + }) + + await node.start() + + await delay(300) + expect(node._dht.getClosestPeers.callCount).to.eql(1) + await delay(500) + expect(node._dht.getClosestPeers.callCount).to.eql(2) + }) }) })