diff --git a/README.md b/README.md index 3e2188ee04..cb940cd86e 100644 --- a/README.md +++ b/README.md @@ -116,9 +116,12 @@ const MulticastDNS = require('libp2p-mdns') const DHT = require('libp2p-kad-dht') const defaultsDeep = require('@nodeutils/defaults-deep') const Protector = require('libp2p-pnet') +const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') +const DelegatedContentRouter = require('libp2p-delegated-content-routing') class Node extends libp2p { constructor (_options) { + const peerInfo = _options.peerInfo const defaults = { // The libp2p modules for this libp2p bundle modules: { @@ -133,8 +136,16 @@ class Node extends libp2p { connEncryption: [ SECIO ], - // Encryption for private networks. Needs additional private key to work + /** Encryption for private networks. Needs additional private key to work **/ // connProtector: new Protector(/*protector specific opts*/), + /** Enable custom content routers, such as delegated routing **/ + // contentRouting: [ + // new DelegatedContentRouter(peerInfo.id) + // ], + /** Enable custom peer routers, such as delegated routing **/ + // peerRouting: [ + // new DelegatedPeerRouter() + // ], peerDiscovery: [ MulticastDNS ], @@ -230,16 +241,19 @@ Required keys in the `options` object: `callback` is a function with the following `function (err) {}` signature, where `err` is an Error in case stopping the node fails. -#### `libp2p.peerRouting.findPeer(id, callback)` +#### `libp2p.peerRouting.findPeer(id, options, callback)` > Looks up for multiaddrs of a peer in the DHT - `id`: instance of [PeerId][] +- `options`: object of options +- `options.maxTimeout`: Number milliseconds -#### `libp2p.contentRouting.findProviders(key, timeout, callback)` +#### `libp2p.contentRouting.findProviders(key, options, callback)` - `key`: Buffer -- `timeout`: Number miliseconds +- `options`: object of options +- `options.maxTimeout`: Number milliseconds #### `libp2p.contentRouting.provide(key, callback)` @@ -307,14 +321,18 @@ Required keys in the `options` object: - `key`: Buffer - `value`: Buffer -#### `libp2p.dht.get(key, callback)` +#### `libp2p.dht.get(key, options, callback)` - `key`: Buffer +- `options`: object of options +- `options.maxTimeout`: Number milliseconds -#### `libp2p.dht.getMany(key, nVals, callback)` +#### `libp2p.dht.getMany(key, nVals, options, callback)` - `key`: Buffer - `nVals`: Number +- `options`: object of options +- `options.maxTimeout`: Number milliseconds [PeerInfo]: https://github.com/libp2p/js-peer-info [PeerId]: https://github.com/libp2p/js-peer-id diff --git a/examples/delegated-routing/README.md b/examples/delegated-routing/README.md new file mode 100644 index 0000000000..debd15407f --- /dev/null +++ b/examples/delegated-routing/README.md @@ -0,0 +1,49 @@ +# Delegated Routing with Libp2p and IPFS + +This example shows how to use delegated peer and content routing. The [Peer and Content Routing Example](../peer-and-content-routing) focuses +on the DHT implementation. This example takes that a step further and introduces delegated routing. Delegated routing is +especially useful when your libp2p node will have limited resources, making running a DHT impractical. It's +also highly useful if your node is generating content, but can't reliably be on the network. You can use delegate nodes +to provide content on your behalf. + +The starting [Libp2p Bundle](./src/libp2p-bundle.js) in this example starts by disabling the DHT and adding the Delegated Peer and Content Routers. +Once you've completed the example, you should try enabled the DHT and see what kind of results you get! You can also enable the +various Peer Discovery modules and see the impact it has on your Peer count. + +## Prerequisite +**NOTE**: This example is currently dependent on a clone of the [delegated routing support branch of go-ipfs](https://github.com/ipfs/go-ipfs/pull/4595). + +## Running this example + +1. Install IPFS locally if you dont already have it. [Install Guide](https://docs.ipfs.io/introduction/install/) +2. Run the IPFS daemon: `ipfs daemon` +3. The daemon will output a line about its API address, like `API server listening on /ip4/127.0.0.1/tcp/8080` +4. In another window output the addresses of the node: `ipfs id`. Make note of the websocket address, is will contain `/ws/` in the address. +5. In `./src/libp2p-bundle.js` replace the `delegatedApiOptions` host and port of your node if they are different. +6. In `./src/App.js` replace `BootstrapNode` with your nodes Websocket address from step 4. +7. Start this example: + +```sh +npm install +npm start +``` + +This should open your browser to http://localhost:3000. If it does not, go ahead and do that now. + +8. Your browser should show you connected to at least 1 peer. + +### Finding Content via the Delegate +1. Add a file to your IPFS node. From this example root you can do `ipfs add ./README.md` to add the example readme. +2. Copy the hash from line 5, it will look something like *Qmf33vz4HJFkqgH7XPP1uA6atYKTX1BWQEQthzpKcAdeyZ*. +3. In the browser, paste the hash into the *Hash* field and hit `Find`. The readme contents should display. + +This will do a few things: +* The delegate nodes api will be queried to find providers of the content +* The content will be fetched from the providers +* Since we now have the content, we tell the delegate node to fetch the content from us and become a provider + +### Finding Peers via the Delegate +1. Get a list of your delegate nodes peer by querying the IPFS daemon: `ipfs swarm peers` +2. Copy one of the CIDs from the list of peer addresses, this will be the last portion of the address and will look something like `QmdoG8DpzYUZMVP5dGmgmigZwR1RE8Cf6SxMPg1SBXJAQ8`. +3. In your browser, paste the CID into the *Peer* field and hit `Find`. +4. You should see information about the peer including its addresses. diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json new file mode 100644 index 0000000000..ff36b7bd79 --- /dev/null +++ b/examples/delegated-routing/package.json @@ -0,0 +1,23 @@ +{ + "name": "delegated-routing-example", + "version": "0.1.0", + "private": true, + "dependencies": { + "ipfs": "~0.32.2", + "libp2p": "../../", + "libp2p-delegated-content-routing": "~0.2.2", + "libp2p-delegated-peer-routing": "~0.2.2", + "libp2p-kad-dht": "~0.10.4", + "libp2p-mplex": "~0.8.0", + "libp2p-secio": "~0.10.0", + "libp2p-webrtc-star": "~0.15.5", + "libp2p-websocket-star": "~0.8.1", + "libp2p-websockets": "~0.12.0", + "react": "^16.5.2", + "react-dom": "^16.5.2", + "react-scripts": "1.1.5" + }, + "scripts": { + "start": "react-scripts start" + } +} diff --git a/examples/delegated-routing/public/favicon.ico b/examples/delegated-routing/public/favicon.ico new file mode 100644 index 0000000000..a11777cc47 Binary files /dev/null and b/examples/delegated-routing/public/favicon.ico differ diff --git a/examples/delegated-routing/public/index.html b/examples/delegated-routing/public/index.html new file mode 100644 index 0000000000..9082175b4e --- /dev/null +++ b/examples/delegated-routing/public/index.html @@ -0,0 +1,16 @@ + + + + + + + Delegated Routing + + + + +
+ + diff --git a/examples/delegated-routing/public/main.css b/examples/delegated-routing/public/main.css new file mode 100644 index 0000000000..5cd8188187 --- /dev/null +++ b/examples/delegated-routing/public/main.css @@ -0,0 +1,67 @@ +body { + margin: 0; + padding: 0; + font-family: sans-serif; +} + +section * { + margin: 10px; +} + +header { + background-color: #222; + height: 150px; + padding: 20px; + color: white; +} + +.center { + text-align: center; +} + +pre { + background-color: bisque; + min-height: 100px; + margin: 0px; + padding: 10px; +} + +.loader { + text-align: center; + height: 64px; + margin-bottom: -64px; +} + +.loading .lds-ripple { + display: inline-block; + position: relative; + width: 64px; + height: 64px; +} +.loading .lds-ripple div { + position: absolute; + border: 4px solid #000; + opacity: 1; + border-radius: 50%; + animation: lds-ripple 1s cubic-bezier(0, 0.2, 0.8, 1) infinite; + margin: auto; +} +.loading .lds-ripple div:nth-child(2) { + animation-delay: -0.5s; +} +@keyframes lds-ripple { + 0% { + top: 28px; + left: 28px; + width: 0; + height: 0; + opacity: 1; + } + 100% { + top: -1px; + left: -1px; + width: 58px; + height: 58px; + opacity: 0; + } +} \ No newline at end of file diff --git a/examples/delegated-routing/src/App.js b/examples/delegated-routing/src/App.js new file mode 100644 index 0000000000..7e5425d228 --- /dev/null +++ b/examples/delegated-routing/src/App.js @@ -0,0 +1,153 @@ +// eslint-disable-next-line +'use strict' + +const React = require('react') +const Component = React.Component +const Ipfs = require('ipfs') +const libp2pBundle = require('./libp2p-bundle') +// require('./App.css') + +const BootstrapNode = '/ip4/127.0.0.1/tcp/8081/ws/ipfs/QmdoG8DpzYUZMVP5dGmgmigZwR1RE8Cf6SxMPg1SBXJAQ8' + +class App extends Component { + constructor (props) { + super(props) + this.state = { + peers: 0, + // This hash is the IPFS readme + hash: 'QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB', + // This peer is one of the Bootstrap nodes for IPFS + peer: 'QmV6kA2fB8kTr6jc3pL5zbNsjKbmPUHAPKKHRBYe1kDEyc', + isLoading: 0 + } + this.peerInterval = null + + this.handleHashChange = this.handleHashChange.bind(this) + this.handleHashSubmit = this.handleHashSubmit.bind(this) + this.handlePeerChange = this.handlePeerChange.bind(this) + this.handlePeerSubmit = this.handlePeerSubmit.bind(this) + } + + handleHashChange (event) { + this.setState({ + hash: event.target.value + }) + } + handlePeerChange (event) { + this.setState({ + peer: event.target.value + }) + } + + handleHashSubmit (event) { + event.preventDefault() + this.setState({ + isLoading: this.state.isLoading + 1 + }) + + this.ipfs.files.cat(this.state.hash, (err, data) => { + if (err) console.log('Error', err) + + this.setState({ + response: data.toString(), + isLoading: this.state.isLoading - 1 + }) + }) + } + handlePeerSubmit (event) { + event.preventDefault() + this.setState({ + isLoading: this.state.isLoading + 1 + }) + + this.ipfs.dht.findpeer(this.state.peer, (err, results) => { + if (err) console.log('Error', err) + + this.setState({ + response: JSON.stringify(results, null, 2), + isLoading: this.state.isLoading - 1 + }) + }) + } + + componentDidMount () { + window.ipfs = this.ipfs = new Ipfs({ + config: { + Addresses: { + Swarm: [] + }, + Discovery: { + MDNS: { + Enabled: false + }, + webRTCStar: { + Enabled: false + } + }, + Bootstrap: [ + BootstrapNode + ] + }, + preload: { + enabled: false + }, + libp2p: libp2pBundle + }) + this.ipfs.on('ready', () => { + if (this.peerInterval) { + clearInterval(this.peerInterval) + } + + this.ipfs.swarm.connect(BootstrapNode, (err) => { + if (err) { + console.log('Error connecting to the node', err) + } + console.log('Connected!') + }) + + this.peerInterval = setInterval(() => { + this.ipfs.swarm.peers((err, peers) => { + if (err) console.log(err) + if (peers) this.setState({peers: peers.length}) + }) + }, 2500) + }) + } + + render () { + return ( +
+
+

Delegated Routing

+

There are currently {this.state.peers} peers.

+
+
+
+ +
+
+ +
+
+
0 ? 'loading' : '', 'loader'].join(' ')}> +
+
+
+
+            {this.state.response}
+          
+
+
+ ) + } +} + +module.exports = App diff --git a/examples/delegated-routing/src/index.js b/examples/delegated-routing/src/index.js new file mode 100644 index 0000000000..366e865023 --- /dev/null +++ b/examples/delegated-routing/src/index.js @@ -0,0 +1,9 @@ +// eslint-disable-next-line +'use strict' + +const React = require('react') // eslint-disable-line no-unused-vars +const ReactDOM = require('react-dom') +const App = require('./App') // eslint-disable-line no-unused-vars +// require('index.css') + +ReactDOM.render(, document.getElementById('root')) diff --git a/examples/delegated-routing/src/libp2p-bundle.js b/examples/delegated-routing/src/libp2p-bundle.js new file mode 100644 index 0000000000..c5ab0dc752 --- /dev/null +++ b/examples/delegated-routing/src/libp2p-bundle.js @@ -0,0 +1,78 @@ +// eslint-disable-next-line +'use strict' + +const Libp2p = require('libp2p') +const Websockets = require('libp2p-websockets') +const WebSocketStar = require('libp2p-websocket-star') +const WebRTCStar = require('libp2p-webrtc-star') +const MPLEX = require('libp2p-mplex') +const SECIO = require('libp2p-secio') +const KadDHT = require('libp2p-kad-dht') +const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') +const DelegatedContentRouter = require('libp2p-delegated-content-routing') + +module.exports = ({peerInfo, peerBook}) => { + const wrtcstar = new WebRTCStar({id: peerInfo.id}) + const wsstar = new WebSocketStar({id: peerInfo.id}) + const delegatedApiOptions = { + host: '0.0.0.0', + protocol: 'http', + port: '8080' + } + + return new Libp2p({ + peerInfo, + peerBook, + // Lets limit the connection managers peers and have it check peer health less frequently + connectionManager: { + maxPeers: 10, + pollInterval: 5000 + }, + modules: { + contentRouting: [ + new DelegatedContentRouter(peerInfo.id, delegatedApiOptions) + ], + peerRouting: [ + new DelegatedPeerRouter(delegatedApiOptions) + ], + peerDiscovery: [ + wrtcstar.discovery, + wsstar.discovery + ], + transport: [ + wrtcstar, + wsstar, + Websockets + ], + streamMuxer: [ + MPLEX + ], + connEncryption: [ + SECIO + ], + dht: KadDHT + }, + config: { + peerDiscovery: { + webrtcStar: { + enabled: false + }, + websocketStar: { + enabled: false + } + }, + dht: { + kBucketSize: 20 + }, + relay: { + enabled: true, + hop: { + enabled: false + } + }, + EXPERIMENTAL: { + dht: false + } + } + }) +} diff --git a/package.json b/package.json index e9f27741de..4f8ee1f168 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ }, "dependencies": { "async": "^2.6.1", + "err-code": "^1.1.2", "joi": "^13.6.0", "joi-browser": "^13.4.0", "libp2p-connection-manager": "~0.0.2", @@ -47,6 +48,7 @@ "libp2p-websockets": "~0.12.0", "mafmt": "^6.0.2", "multiaddr": "^5.0.0", + "nock": "^9.4.3", "peer-book": "~0.8.0", "peer-id": "~0.11.0", "peer-info": "~0.14.1" @@ -59,6 +61,8 @@ "dirty-chai": "^2.0.1", "electron-webrtc": "~0.3.0", "libp2p-circuit": "~0.2.1", + "libp2p-delegated-content-routing": "~0.2.2", + "libp2p-delegated-peer-routing": "~0.2.2", "libp2p-kad-dht": "~0.10.5", "libp2p-mdns": "~0.12.0", "libp2p-mplex": "~0.8.2", diff --git a/src/config.js b/src/config.js index 7cb9def5ec..ac95d1a7d2 100644 --- a/src/config.js +++ b/src/config.js @@ -10,14 +10,16 @@ const OptionsSchema = Joi.object({ peerInfo: Joi.object().required(), peerBook: Joi.object(), modules: Joi.object().keys({ - transport: Joi.array().items(ModuleSchema).min(1).required(), - streamMuxer: Joi.array().items(ModuleSchema).allow(null), connEncryption: Joi.array().items(ModuleSchema).allow(null), connProtector: Joi.object().keys({ protect: Joi.func().required() }).unknown(), + contentRouting: Joi.array().items(Joi.object()).allow(null), + dht: ModuleSchema.allow(null), peerDiscovery: Joi.array().items(ModuleSchema).allow(null), - dht: ModuleSchema.allow(null) + peerRouting: Joi.array().items(Joi.object()).allow(null), + streamMuxer: Joi.array().items(ModuleSchema).allow(null), + transport: Joi.array().items(ModuleSchema).min(1).required() }).required(), config: Joi.object().keys({ peerDiscovery: Joi.object().allow(null), diff --git a/src/content-routing.js b/src/content-routing.js index 559541ed45..b9f507f9f5 100644 --- a/src/content-routing.js +++ b/src/content-routing.js @@ -1,20 +1,82 @@ 'use strict' +const tryEach = require('async/tryEach') +const parallel = require('async/parallel') +const errCode = require('err-code') + module.exports = (node) => { + const routers = node._modules.contentRouting || [] + + // If we have the dht, make it first + if (node._dht) { + routers.unshift(node._dht) + } + return { - findProviders: (key, timeout, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + /** + * Iterates over all content routers in series to find providers of the given key. + * Once a content router succeeds, iteration will stop. + * + * @param {CID} key The CID key of the content to find + * @param {object} options + * @param {number} options.maxTimeout How long the query should run + * @param {function(Error, Result)} callback + * @returns {void} + */ + findProviders: (key, options, callback) => { + if (!routers.length) { + return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')) + } + + if (typeof options === 'function') { + callback = options + options = {} + } else if (typeof options === 'number') { // This can be deprecated in a future release + options = { + maxTimeout: options + } } - node._dht.findProviders(key, timeout, callback) + const tasks = routers.map((router) => { + return (cb) => router.findProviders(key, options, (err, results) => { + if (err) { + return cb(err) + } + + // If we don't have any results, we need to provide an error to keep trying + if (!results || Object.keys(results).length === 0) { + return cb(errCode(new Error('not found'), 'NOT_FOUND'), null) + } + + cb(null, results) + }) + }) + + tryEach(tasks, (err, results) => { + if (err && err.code !== 'NOT_FOUND') { + return callback(err) + } + results = results || [] + callback(null, results) + }) }, + + /** + * Iterates over all content routers in parallel to notify it is + * a provider of the given key. + * + * @param {CID} key The CID key of the content to find + * @param {function(Error)} callback + * @returns {void} + */ provide: (key, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + if (!routers.length) { + return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')) } - node._dht.provide(key, callback) + parallel(routers.map((router) => { + return (cb) => router.provide(key, cb) + }), callback) } } } diff --git a/src/index.js b/src/index.js index 12056fd948..424018fa76 100644 --- a/src/index.js +++ b/src/index.js @@ -20,8 +20,6 @@ const pubsub = require('./pubsub') const getPeerInfo = require('./get-peer-info') const validateConfig = require('./config').validate -exports = module.exports - const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet' class Node extends EventEmitter { @@ -102,6 +100,7 @@ class Node extends EventEmitter { } // Attach remaining APIs + // peer and content routing will automatically get modules from _modules and _dht this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) this.dht = dht(this) diff --git a/src/peer-routing.js b/src/peer-routing.js index 3a48d075ee..0d5f5dbcb5 100644 --- a/src/peer-routing.js +++ b/src/peer-routing.js @@ -1,13 +1,58 @@ 'use strict' +const tryEach = require('async/tryEach') +const errCode = require('err-code') + module.exports = (node) => { + const routers = node._modules.peerRouting || [] + + // If we have the dht, make it first + if (node._dht) { + routers.unshift(node._dht) + } + return { - findPeer: (id, callback) => { - if (!node._dht) { - return callback(new Error('DHT is not available')) + /** + * Iterates over all peer routers in series to find the given peer. + * + * @param {String} id The id of the peer to find + * @param {object} options + * @param {number} options.maxTimeout How long the query should run + * @param {function(Error, Result)} callback + * @returns {void} + */ + findPeer: (id, options, callback) => { + if (!routers.length) { + callback(errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')) } - node._dht.findPeer(id, callback) + if (typeof options === 'function') { + callback = options + options = {} + } + + const tasks = routers.map((router) => { + return (cb) => router.findPeer(id, options, (err, result) => { + if (err) { + return cb(err) + } + + // If we don't have a result, we need to provide an error to keep trying + if (!result || Object.keys(result).length === 0) { + return cb(errCode(new Error('not found'), 'NOT_FOUND'), null) + } + + cb(null, result) + }) + }) + + tryEach(tasks, (err, results) => { + if (err && err.code !== 'NOT_FOUND') { + return callback(err) + } + results = results || [] + callback(null, results) + }) } } } diff --git a/test/config.spec.js b/test/config.spec.js index 892cccce1d..3b4c687dcd 100644 --- a/test/config.spec.js +++ b/test/config.spec.js @@ -9,6 +9,8 @@ const PeerId = require('peer-id') const waterfall = require('async/waterfall') const WS = require('libp2p-websockets') const Bootstrap = require('libp2p-bootstrap') +const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') +const DelegatedContentRouter = require('libp2p-delegated-content-routing') const validateConfig = require('../src/config').validate @@ -98,6 +100,34 @@ describe('configuration', () => { expect(validateConfig(options)).to.deep.equal(expected) }) + it('should allow for delegated content and peer routing', () => { + const peerRouter = new DelegatedPeerRouter() + const contentRouter = new DelegatedContentRouter(peerInfo) + + const options = { + peerInfo, + modules: { + transport: [ WS ], + peerDiscovery: [ Bootstrap ], + peerRouting: [ peerRouter ], + contentRouting: [ contentRouter ] + }, + config: { + peerDiscovery: { + bootstrap: { + interval: 1000, + enabled: true + } + } + } + } + + expect(validateConfig(options).modules).to.deep.include({ + peerRouting: [ peerRouter ], + contentRouting: [ contentRouter ] + }) + }) + it('should not allow for dht to be enabled without it being provided', () => { const options = { peerInfo, diff --git a/test/content-routing.node.js b/test/content-routing.node.js index 3414a6174c..14a64a774e 100644 --- a/test/content-routing.node.js +++ b/test/content-routing.node.js @@ -7,87 +7,363 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const parallel = require('async/parallel') +const waterfall = require('async/waterfall') const _times = require('lodash.times') const CID = require('cids') +const DelegatedContentRouter = require('libp2p-delegated-content-routing') +const sinon = require('sinon') +const nock = require('nock') +const ma = require('multiaddr') +const Node = require('./utils/bundle-nodejs') const createNode = require('./utils/create-node') +const createPeerInfo = createNode.createPeerInfo describe('.contentRouting', () => { - let nodeA - let nodeB - let nodeC - let nodeD - let nodeE - - before(function (done) { - this.timeout(5 * 1000) - const tasks = _times(5, () => (cb) => { - createNode('/ip4/0.0.0.0/tcp/0', { - config: { - EXPERIMENTAL: { - dht: true + describe('via the dht', () => { + let nodeA + let nodeB + let nodeC + let nodeD + let nodeE + + before(function (done) { + this.timeout(5 * 1000) + const tasks = _times(5, () => (cb) => { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + EXPERIMENTAL: { + dht: true + } } - } - }, (err, node) => { + }, (err, node) => { + expect(err).to.not.exist() + node.start((err) => cb(err, node)) + }) + }) + + parallel(tasks, (err, nodes) => { expect(err).to.not.exist() - node.start((err) => cb(err, node)) + nodeA = nodes[0] + nodeB = nodes[1] + nodeC = nodes[2] + nodeD = nodes[3] + nodeE = nodes[4] + + parallel([ + (cb) => nodeA.dial(nodeB.peerInfo, cb), + (cb) => nodeB.dial(nodeC.peerInfo, cb), + (cb) => nodeC.dial(nodeD.peerInfo, cb), + (cb) => nodeD.dial(nodeE.peerInfo, cb), + (cb) => nodeE.dial(nodeA.peerInfo, cb) + ], done) }) }) - parallel(tasks, (err, nodes) => { - expect(err).to.not.exist() - nodeA = nodes[0] - nodeB = nodes[1] - nodeC = nodes[2] - nodeD = nodes[3] - nodeE = nodes[4] - + after((done) => { parallel([ - (cb) => nodeA.dial(nodeB.peerInfo, cb), - (cb) => nodeB.dial(nodeC.peerInfo, cb), - (cb) => nodeC.dial(nodeD.peerInfo, cb), - (cb) => nodeD.dial(nodeE.peerInfo, cb), - (cb) => nodeE.dial(nodeA.peerInfo, cb) + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb), + (cb) => nodeC.stop(cb), + (cb) => nodeD.stop(cb), + (cb) => nodeE.stop(cb) ], done) }) - }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb), - (cb) => nodeC.stop(cb), - (cb) => nodeD.stop(cb), - (cb) => nodeE.stop(cb) - ], done) + it('should use the nodes dht to provide', (done) => { + const stub = sinon.stub(nodeA._dht, 'provide').callsFake(() => { + stub.restore() + done() + }) + + nodeA.contentRouting.provide() + }) + + it('should use the nodes dht to find providers', (done) => { + const stub = sinon.stub(nodeA._dht, 'findProviders').callsFake(() => { + stub.restore() + done() + }) + + nodeA.contentRouting.findProviders() + }) + + describe('le ring', () => { + const cid = new CID('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') + + it('let kbucket get filled', (done) => { + setTimeout(() => done(), 250) + }) + + it('nodeA.contentRouting.provide', (done) => { + nodeA.contentRouting.provide(cid, done) + }) + + it('nodeE.contentRouting.findProviders for existing record', (done) => { + nodeE.contentRouting.findProviders(cid, { maxTimeout: 5000 }, (err, providers) => { + expect(err).to.not.exist() + expect(providers).to.have.length.above(0) + done() + }) + }) + + it('nodeC.contentRouting.findProviders for non existing record (timeout)', (done) => { + const cid = new CID('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSnnnn') + + nodeE.contentRouting.findProviders(cid, { maxTimeout: 5000 }, (err, providers) => { + expect(err).to.not.exist() + expect(providers).to.have.length(0) + done() + }) + }) + }) }) - describe('le ring', () => { - const cid = new CID('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') + describe('via a delegate', () => { + let nodeA + let delegate + + before((done) => { + waterfall([ + (cb) => { + createPeerInfo(cb) + }, + // Create the node using the delegate + (peerInfo, cb) => { + delegate = new DelegatedContentRouter(peerInfo.id, { + host: '0.0.0.0', + protocol: 'http', + port: 60197 + }, [ + ma('/ip4/0.0.0.0/tcp/60194') + ]) + nodeA = new Node({ + peerInfo, + modules: { + contentRouting: [ delegate ] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: true, + active: false + } + } + } + }) + nodeA.start(cb) + } + ], done) + }) + + after((done) => nodeA.stop(done)) + afterEach(() => nock.cleanAll()) + + describe('provide', () => { + it('should use the delegate router to provide', (done) => { + const stub = sinon.stub(delegate, 'provide').callsFake(() => { + stub.restore() + done() + }) + nodeA.contentRouting.provide() + }) + + it('should be able to register as a provider', (done) => { + const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') + const mockApi = nock('http://0.0.0.0:60197') + // mock the swarm connect + .post('/api/v0/swarm/connect') + .query({ + arg: `/ip4/0.0.0.0/tcp/60194/p2p-circuit/ipfs/${nodeA.peerInfo.id.toB58String()}`, + 'stream-channels': true + }) + .reply(200, { + Strings: [`connect ${nodeA.peerInfo.id.toB58String()} success`] + }, ['Content-Type', 'application/json']) + // mock the refs call + .post('/api/v0/refs') + .query({ + recursive: true, + arg: cid.toBaseEncodedString(), + 'stream-channels': true + }) + .reply(200, null, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + nodeA.contentRouting.provide(cid, (err) => { + expect(err).to.not.exist() + expect(mockApi.isDone()).to.equal(true) + done() + }) + }) + + it('should handle errors when registering as a provider', (done) => { + const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') + const mockApi = nock('http://0.0.0.0:60197') + // mock the swarm connect + .post('/api/v0/swarm/connect') + .query({ + arg: `/ip4/0.0.0.0/tcp/60194/p2p-circuit/ipfs/${nodeA.peerInfo.id.toB58String()}`, + 'stream-channels': true + }) + .reply(502, 'Bad Gateway', ['Content-Type', 'application/json']) + + nodeA.contentRouting.provide(cid, (err) => { + expect(err).to.exist() + expect(mockApi.isDone()).to.equal(true) + done() + }) + }) + }) + + describe('find providers', () => { + it('should use the delegate router to find providers', (done) => { + const stub = sinon.stub(delegate, 'findProviders').callsFake(() => { + stub.restore() + done() + }) + nodeA.contentRouting.findProviders() + }) + + it('should be able to find providers', (done) => { + const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') + const provider = 'QmZNgCqZCvTsi3B4Vt7gsSqpkqDpE7M2Y9TDmEhbDb4ceF' + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/findprovs') + .query({ + arg: cid.toBaseEncodedString(), + timeout: '1000ms', + 'stream-channels': true + }) + .reply(200, `{"Extra":"","ID":"QmWKqWXCtRXEeCQTo3FoZ7g4AfnGiauYYiczvNxFCHicbB","Responses":[{"Addrs":["/ip4/0.0.0.0/tcp/0"],"ID":"${provider}"}],"Type":1}\n`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + nodeA.contentRouting.findProviders(cid, 1000, (err, response) => { + expect(err).to.not.exist() + expect(response).to.have.length(1) + expect(response[0].id.toB58String()).to.equal(provider) + expect(mockApi.isDone()).to.equal(true) + done() + }) + }) + + it('should handle errors when finding providers', (done) => { + const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB') + const mockApi = nock('http://0.0.0.0:60197') + .post('/api/v0/dht/findprovs') + .query({ + arg: cid.toBaseEncodedString(), + timeout: '30000ms', + 'stream-channels': true + }) + .reply(502, 'Bad Gateway', [ + 'X-Chunked-Output', '1' + ]) - it('let kbucket get filled', (done) => { - setTimeout(() => done(), 250) + nodeA.contentRouting.findProviders(cid, (err) => { + expect(err).to.exist() + expect(mockApi.isDone()).to.equal(true) + done() + }) + }) }) + }) - it('nodeA.contentRouting.provide', (done) => { - nodeA.contentRouting.provide(cid, done) + describe('via the dht and a delegate', () => { + let nodeA + let delegate + + before((done) => { + waterfall([ + (cb) => { + createPeerInfo(cb) + }, + // Create the node using the delegate + (peerInfo, cb) => { + delegate = new DelegatedContentRouter(peerInfo.id, { + host: '0.0.0.0', + protocol: 'http', + port: 60197 + }, [ + ma('/ip4/0.0.0.0/tcp/60194') + ]) + nodeA = new Node({ + peerInfo, + modules: { + contentRouting: [ delegate ] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: true, + active: false + } + }, + EXPERIMENTAL: { + dht: true + } + } + }) + nodeA.start(cb) + } + ], done) }) - it('nodeE.contentRouting.findProviders for existing record', (done) => { - nodeE.contentRouting.findProviders(cid, 5000, (err, providers) => { - expect(err).to.not.exist() - expect(providers).to.have.length.above(0) - done() + after((done) => nodeA.stop(done)) + + describe('provide', () => { + it('should use both the dht and delegate router to provide', (done) => { + const dhtStub = sinon.stub(nodeA._dht, 'provide').callsFake(() => {}) + const delegateStub = sinon.stub(delegate, 'provide').callsFake(() => { + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + nodeA.contentRouting.provide() }) }) - it('nodeC.contentRouting.findProviders for non existing record (timeout)', (done) => { - const cid = new CID('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSnnnn') + describe('findProviders', () => { + it('should only use the dht if it finds providers', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, results) + const delegateStub = sinon.stub(delegate, 'findProviders').throws(() => { + return new Error('the delegate should not have been called') + }) - nodeE.contentRouting.findProviders(cid, 5000, (err, providers) => { - expect(err).to.not.exist() - expect(providers).to.have.length(0) - done() + nodeA.contentRouting.findProviders('a cid', { maxTimeout: 5000 }, (err, results) => { + expect(err).to.not.exist() + expect(results).to.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.notCalled).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + + it('should use the delegate if the dht fails to find providers', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, []) + const delegateStub = sinon.stub(delegate, 'findProviders').callsArgWith(2, null, results) + + nodeA.contentRouting.findProviders('a cid', { maxTimeout: 5000 }, (err, results) => { + expect(err).to.not.exist() + expect(results).to.deep.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) }) }) }) diff --git a/test/peer-routing.node.js b/test/peer-routing.node.js index b232e423ca..7d728ccd9d 100644 --- a/test/peer-routing.node.js +++ b/test/peer-routing.node.js @@ -8,92 +8,262 @@ chai.use(require('dirty-chai')) const expect = chai.expect const parallel = require('async/parallel') const _times = require('lodash.times') +const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') +const sinon = require('sinon') +const nock = require('nock') const createNode = require('./utils/create-node') describe('.peerRouting', () => { - let nodeA - let nodeB - let nodeC - let nodeD - let nodeE - - before(function (done) { - this.timeout(5 * 1000) - - const tasks = _times(5, () => (cb) => { - createNode('/ip4/0.0.0.0/tcp/0', { - config: { - EXPERIMENTAL: { - dht: true + describe('via the dht', () => { + let nodeA + let nodeB + let nodeC + let nodeD + let nodeE + + before('create the outer ring of connections', (done) => { + const tasks = _times(5, () => (cb) => { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + EXPERIMENTAL: { + dht: true + } } - } - }, (err, node) => { + }, (err, node) => { + expect(err).to.not.exist() + node.start((err) => cb(err, node)) + }) + }) + + parallel(tasks, (err, nodes) => { expect(err).to.not.exist() - node.start((err) => cb(err, node)) + nodeA = nodes[0] + nodeB = nodes[1] + nodeC = nodes[2] + nodeD = nodes[3] + nodeE = nodes[4] + + parallel([ + (cb) => nodeA.dial(nodeB.peerInfo, cb), + (cb) => nodeB.dial(nodeC.peerInfo, cb), + (cb) => nodeC.dial(nodeD.peerInfo, cb), + (cb) => nodeD.dial(nodeE.peerInfo, cb), + (cb) => nodeE.dial(nodeA.peerInfo, cb) + ], (err) => { + expect(err).to.not.exist() + // Give the kbucket time to fill in the dht + setTimeout(done, 250) + }) }) }) - parallel(tasks, (err, nodes) => { - expect(err).to.not.exist() - nodeA = nodes[0] - nodeB = nodes[1] - nodeC = nodes[2] - nodeD = nodes[3] - nodeE = nodes[4] - + after((done) => { parallel([ - (cb) => nodeA.dial(nodeB.peerInfo, cb), - (cb) => nodeB.dial(nodeC.peerInfo, cb), - (cb) => nodeC.dial(nodeD.peerInfo, cb), - (cb) => nodeD.dial(nodeE.peerInfo, cb), - (cb) => nodeE.dial(nodeA.peerInfo, cb) + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb), + (cb) => nodeC.stop(cb), + (cb) => nodeD.stop(cb), + (cb) => nodeE.stop(cb) ], done) }) - }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb), - (cb) => nodeC.stop(cb), - (cb) => nodeD.stop(cb), - (cb) => nodeE.stop(cb) - ], done) + it('should use the nodes dht', (done) => { + const stub = sinon.stub(nodeA._dht, 'findPeer').callsFake(() => { + stub.restore() + done() + }) + + nodeA.peerRouting.findPeer() + }) + + describe('connected in an el ring', () => { + it('should be able to find a peer we are not directly connected to', (done) => { + parallel([ + (cb) => nodeA.dial(nodeC.peerInfo.id, cb), + (cb) => nodeB.dial(nodeD.peerInfo.id, cb), + (cb) => nodeC.dial(nodeE.peerInfo.id, cb) + ], (err) => { + if (err) throw err + expect(err).to.not.exist() + nodeB.peerRouting.findPeer(nodeE.peerInfo.id, (err, peerInfo) => { + expect(err).to.not.exist() + expect(nodeE.peerInfo.id.toB58String()).to.equal(peerInfo.id.toB58String()) + done() + }) + }) + }) + }) }) - describe('el ring', () => { - it('let kbucket get filled', (done) => { - setTimeout(() => done(), 250) + describe('via a delegate', () => { + let nodeA + let delegate + + before((done) => { + parallel([ + // Create the node using the delegate + (cb) => { + delegate = new DelegatedPeerRouter({ + host: 'ipfs.io', + protocol: 'https', + port: '443' + }) + createNode('/ip4/0.0.0.0/tcp/0', { + modules: { + peerRouting: [ delegate ] + } + }, (err, node) => { + expect(err).to.not.exist() + nodeA = node + nodeA.start(cb) + }) + } + ], done) }) - it('nodeA.dial by Id to node C', (done) => { - nodeA.dial(nodeC.peerInfo.id, (err) => { - expect(err).to.not.exist() + after((done) => nodeA.stop(done)) + afterEach(() => nock.cleanAll()) + + it('should use the delegate router to find peers', (done) => { + const stub = sinon.stub(delegate, 'findPeer').callsFake(() => { + stub.restore() done() }) + nodeA.peerRouting.findPeer() }) - it('nodeB.dial by Id to node D', (done) => { - nodeB.dial(nodeD.peerInfo.id, (err) => { + it('should be able to find a peer', (done) => { + const peerKey = 'QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL' + const mockApi = nock('https://ipfs.io') + .post('/api/v0/dht/findpeer') + .query({ + arg: peerKey, + timeout: '30000ms', + 'stream-channels': true + }) + .reply(200, `{"Extra":"","ID":"some other id","Responses":null,"Type":0}\n{"Extra":"","ID":"","Responses":[{"Addrs":["/ip4/127.0.0.1/tcp/4001"],"ID":"${peerKey}"}],"Type":2}\n`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + nodeA.peerRouting.findPeer(peerKey, (err, peerInfo) => { expect(err).to.not.exist() + expect(peerInfo.id.toB58String()).to.equal(peerKey) + expect(mockApi.isDone()).to.equal(true) done() }) }) - it('nodeC.dial by Id to node E', (done) => { - nodeC.dial(nodeE.peerInfo.id, (err) => { - expect(err).to.not.exist() + it('should error when a peer cannot be found', (done) => { + const peerKey = 'key of a peer not on the network' + const mockApi = nock('https://ipfs.io') + .post('/api/v0/dht/findpeer') + .query({ + arg: peerKey, + timeout: '30000ms', + 'stream-channels': true + }) + .reply(200, `{"Extra":"","ID":"some other id","Responses":null,"Type":6}\n{"Extra":"","ID":"yet another id","Responses":null,"Type":0}\n{"Extra":"routing:not found","ID":"","Responses":null,"Type":3}\n`, [ + 'Content-Type', 'application/json', + 'X-Chunked-Output', '1' + ]) + + nodeA.peerRouting.findPeer(peerKey, (err, peerInfo) => { + expect(err).to.exist() + expect(peerInfo).to.not.exist() + expect(mockApi.isDone()).to.equal(true) done() }) }) - it('nodeB.peerRouting.findPeer(nodeE.peerInfo.id)', (done) => { - nodeB.peerRouting.findPeer(nodeE.peerInfo.id, (err, peerInfo) => { - expect(err).to.not.exist() - expect(nodeE.peerInfo.id.toB58String()).to.equal(peerInfo.id.toB58String()) + it('should handle errors from the api', (done) => { + const peerKey = 'key of a peer not on the network' + const mockApi = nock('https://ipfs.io') + .post('/api/v0/dht/findpeer') + .query({ + arg: peerKey, + timeout: '30000ms', + 'stream-channels': true + }) + .reply(502) + + nodeA.peerRouting.findPeer(peerKey, (err, peerInfo) => { + expect(err).to.exist() + expect(peerInfo).to.not.exist() + expect(mockApi.isDone()).to.equal(true) done() }) }) }) + + describe('via the dht and a delegate', () => { + let nodeA + let delegate + + before((done) => { + parallel([ + // Create the node using the delegate + (cb) => { + delegate = new DelegatedPeerRouter({ + host: 'ipfs.io', + protocol: 'https', + port: '443' + }) + createNode('/ip4/0.0.0.0/tcp/0', { + modules: { + peerRouting: [ delegate ] + }, + config: { + EXPERIMENTAL: { + dht: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + nodeA = node + nodeA.start(cb) + }) + } + ], done) + }) + + after((done) => nodeA.stop(done)) + + describe('findPeer', () => { + it('should only use the dht if it finds the peer', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(2, null, results) + const delegateStub = sinon.stub(delegate, 'findPeer').throws(() => { + return new Error('the delegate should not have been called') + }) + + nodeA.peerRouting.findPeer('a peer id', (err, results) => { + expect(err).to.not.exist() + expect(results).to.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.notCalled).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + + it('should use the delegate if the dht fails to find the peer', (done) => { + const results = [true] + const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(2, null, undefined) + const delegateStub = sinon.stub(delegate, 'findPeer').callsArgWith(2, null, results) + + nodeA.peerRouting.findPeer('a peer id', (err, results) => { + expect(err).to.not.exist() + expect(results).to.deep.equal(results) + expect(dhtStub.calledOnce).to.equal(true) + expect(delegateStub.calledOnce).to.equal(true) + delegateStub.restore() + dhtStub.restore() + done() + }) + }) + }) + }) }) diff --git a/test/utils/create-node.js b/test/utils/create-node.js index 11564bfe85..3b9cf43bf2 100644 --- a/test/utils/create-node.js +++ b/test/utils/create-node.js @@ -21,8 +21,7 @@ function createNode (multiaddrs, options, callback) { } waterfall([ - (cb) => PeerId.create({ bits: 512 }, cb), - (peerId, cb) => PeerInfo.create(peerId, cb), + (cb) => createPeerInfo(cb), (peerInfo, cb) => { multiaddrs.map((ma) => peerInfo.multiaddrs.add(ma)) options.peerInfo = peerInfo @@ -31,4 +30,12 @@ function createNode (multiaddrs, options, callback) { ], callback) } +function createPeerInfo (callback) { + waterfall([ + (cb) => PeerId.create({ bits: 512 }, cb), + (peerId, cb) => PeerInfo.create(peerId, cb) + ], callback) +} + module.exports = createNode +module.exports.createPeerInfo = createPeerInfo