From 52d056e6a5d01e32754956f544ae598e369ac981 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 13 Nov 2019 19:36:56 +0100 Subject: [PATCH] chore: refactor tests --- package.json | 29 +- src/index.js | 9 +- src/network.js | 119 +++-- src/private.js | 9 +- src/providers.js | 9 +- src/routing.js | 2 +- src/rpc/handlers/add-provider.js | 19 +- src/rpc/handlers/find-node.js | 38 +- src/rpc/handlers/get-providers.js | 17 +- src/rpc/handlers/get-value.js | 44 +- src/rpc/handlers/ping.js | 7 +- src/rpc/handlers/put-value.js | 29 +- src/rpc/index.js | 46 +- test/kad-dht.spec.js | 291 ++++++------ test/kad-utils.spec.js | 71 ++- test/limited-peer-list.spec.js | 13 +- test/message.spec.js | 107 ++--- test/network.spec.js | 68 ++- test/peer-distance-list.spec.js | 160 +++---- test/peer-list.spec.js | 10 +- test/providers.spec.js | 19 +- test/query.spec.js | 600 +++++++++++------------- test/query/index.spec.js | 201 ++++---- test/random-walk.spec.js | 17 +- test/routing.spec.js | 131 ++---- test/rpc/handlers/add-provider.spec.js | 96 ++-- test/rpc/handlers/find-node.spec.js | 77 ++- test/rpc/handlers/get-providers.spec.js | 92 ++-- test/rpc/handlers/get-value.spec.js | 119 ++--- test/rpc/handlers/ping.spec.js | 31 +- test/rpc/handlers/put-value.spec.js | 64 ++- test/rpc/index.spec.js | 11 +- test/simulation/index.js | 21 +- test/utils/create-disjoint-tracks.js | 90 ++-- test/utils/create-peer-id.js | 19 + test/utils/index.js | 17 +- 36 files changed, 1150 insertions(+), 1552 deletions(-) create mode 100644 test/utils/create-peer-id.js diff --git a/package.json b/package.json index c6e8f3f3..edefaebc 100644 --- a/package.json +++ b/package.json @@ -32,8 +32,8 @@ "url": "https://github.com/libp2p/js-libp2p-kad-dht/issues" }, "engines": { - "node": ">=6.0.0", - "npm": ">=3.0.0" + "node": ">=10.0.0", + "npm": ">=6.0.0" }, "homepage": "https://github.com/libp2p/js-libp2p-kad-dht", "dependencies": { @@ -41,34 +41,34 @@ "async": "^2.6.2", "base32.js": "~0.1.0", "chai-checkmark": "^1.0.1", - "cids": "~0.7.0", + "cids": "~0.7.1", "debug": "^4.1.1", - "err-code": "^1.1.2", + "err-code": "^2.0.0", "hashlru": "^2.3.0", "heap": "~0.2.6", - "interface-datastore": "~0.7.0", + "interface-datastore": "~0.8.0", "k-bucket": "^5.0.0", - "libp2p-crypto": "~0.16.1", + "libp2p-crypto": "~0.17.1", "libp2p-record": "~0.7.0", - "multihashes": "~0.4.14", + "multihashes": "~0.4.15", "multihashing-async": "~0.8.0", "p-filter": "^2.1.0", "p-map": "^3.0.0", - "p-queue": "^6.0.0", + "p-queue": "^6.2.1", "p-timeout": "^3.2.0", "p-times": "^2.1.0", - "peer-id": "~0.13.3", + "peer-id": "~0.13.5", "peer-info": "~0.17.0", "promise-to-callback": "^1.0.0", "promisify-es6": "^1.0.3", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.2", - "pull-stream": "^3.6.9", + "pull-length-prefixed": "^1.3.3", + "pull-stream": "^3.6.14", "varint": "^5.0.0", "xor-distance": "^2.0.0" }, "devDependencies": { - "aegir": "^20.0.0", + "aegir": "^20.4.1", "chai": "^4.2.0", "datastore-level": "~0.12.1", "delay": "^4.3.0", @@ -80,11 +80,12 @@ "lodash": "^4.17.11", "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", + "p-defer": "^3.0.0", "p-each-series": "^2.1.0", "p-map-series": "^2.1.0", "p-retry": "^4.2.0", - "peer-book": "~0.9.1", - "sinon": "^7.3.1" + "peer-book": "~0.9.2", + "sinon": "^7.5.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index fefa400b..c747f689 100644 --- a/src/index.js +++ b/src/index.js @@ -9,7 +9,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const crypto = require('libp2p-crypto') -const promisify = require('promisify-es6') const pFilter = require('p-filter') const pTimeout = require('p-timeout') @@ -241,7 +240,7 @@ class KadDHT extends EventEmitter { * @param {Buffer} key * @param {Object} [options] - get options * @param {number} [options.timeout] - optional timeout (default: 60000) - * @returns {Promise} + * @returns {Promise<{from: PeerId, val: Buffer}>} */ get (key, options = {}) { options.timeout = options.timeout || c.minute @@ -425,7 +424,7 @@ class KadDHT extends EventEmitter { // try dht directly const pkKey = utils.keyForPublicKey(peer) const value = await this.get(pkKey) - pk = crypto.unmarshalPublicKey(value) + pk = crypto.keys.unmarshalPublicKey(value) } info.id = new PeerId(peer.id, null, pk) @@ -468,7 +467,7 @@ class KadDHT extends EventEmitter { // Add peer as provider await this.providers.addProvider(key, this.peerInfo.id) - // Notice closest peers + // Notify closest peers const peers = await this.getClosestPeers(key.buffer) const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0) msg.providerPeers = [this.peerInfo] @@ -476,7 +475,7 @@ class KadDHT extends EventEmitter { await Promise.all(peers.map(async (peer) => { this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String()) try { - await promisify(cb => this.network.sendMessage(peer, msg, cb))() + await this.network.sendMessage(peer, msg) } catch (err) { errors.push(err) } diff --git a/src/network.js b/src/network.js index 8115dd87..f2d41910 100644 --- a/src/network.js +++ b/src/network.js @@ -1,9 +1,8 @@ 'use strict' const pull = require('pull-stream') -const timeout = require('async/timeout') +const pTimeout = require('p-timeout') const lp = require('pull-length-prefixed') -const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const errcode = require('err-code') @@ -33,7 +32,7 @@ class Network { /** * Start the network. - * @async + * @returns {void} */ start () { if (this._running) { @@ -56,7 +55,6 @@ class Network { /** * Stop all network activity. - * * @returns {void} */ stop () { @@ -111,26 +109,22 @@ class Network { /** * Send a request and record RTT for latency measurements. - * + * @async * @param {PeerId} to - The peer that should receive a message * @param {Message} msg - The message to send. * @param {function(Error, Message)} callback - * @returns {void} + * @returns {Promise} */ - sendRequest (to, msg, callback) { + async sendRequest (to, msg) { // TODO: record latency if (!this.isConnected) { - return callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')) + throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') } this._log('sending to: %s', to.toB58String()) - this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => { - if (err) { - return callback(err) - } - this._writeReadMessage(conn, msg.serialize(), callback) - }) + const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))() + return this._writeReadMessage(conn, msg.serialize()) } /** @@ -138,23 +132,17 @@ class Network { * * @param {PeerId} to * @param {Message} msg - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - sendMessage (to, msg, callback) { + async sendMessage (to, msg) { if (!this.isConnected) { - return setImmediate(() => callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE'))) + throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') } this._log('sending to: %s', to.toB58String()) - this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => { - if (err) { - return callback(err) - } - - this._writeMessage(conn, msg.serialize(), callback) - }) + const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))() + return this._writeMessage(conn, msg.serialize()) } /** @@ -164,15 +152,14 @@ class Network { * * @param {Connection} conn - the connection to use * @param {Buffer} msg - the message to send - * @param {function(Error, Message)} callback - * @returns {void} + * @returns {Message} * @private */ - _writeReadMessage (conn, msg, callback) { - timeout( - writeReadMessage, + _writeReadMessage (conn, msg) { + return pTimeout( + writeReadMessage(conn, msg), this.readMessageTimeout - )(conn, msg, callback) + ) } /** @@ -180,45 +167,51 @@ class Network { * * @param {Connection} conn - the connection to use * @param {Buffer} msg - the message to send - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} * @private */ - _writeMessage (conn, msg, callback) { + _writeMessage (conn, msg) { + return new Promise((resolve, reject) => { + pull( + pull.values([msg]), + lp.encode(), + conn, + pull.onEnd((err) => { + if (err) return reject(err) + resolve() + }) + ) + }) + } +} + +function writeReadMessage (conn, msg) { + return new Promise((resolve, reject) => { pull( pull.values([msg]), lp.encode(), conn, - pull.onEnd(callback) + pull.filter((msg) => msg.length < c.maxMessageSize), + lp.decode(), + pull.collect((err, res) => { + if (err) { + return reject(err) + } + if (res.length === 0) { + return reject(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')) + } + + let response + try { + response = Message.deserialize(res[0]) + } catch (err) { + return reject(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE')) + } + + resolve(response) + }) ) - } -} - -function writeReadMessage (conn, msg, callback) { - pull( - pull.values([msg]), - lp.encode(), - conn, - pull.filter((msg) => msg.length < c.maxMessageSize), - lp.decode(), - pull.collect((err, res) => { - if (err) { - return callback(err) - } - if (res.length === 0) { - return callback(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')) - } - - let response - try { - response = Message.deserialize(res[0]) - } catch (err) { - return callback(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE')) - } - - callback(null, response) - }) - ) + }) } module.exports = Network diff --git a/src/private.js b/src/private.js index 57910051..8e4cb860 100644 --- a/src/private.js +++ b/src/private.js @@ -3,7 +3,6 @@ const errcode = require('err-code') const pTimeout = require('p-timeout') -const promisify = require('promisify-es6') const PeerId = require('peer-id') const libp2pRecord = require('libp2p-record') const PeerInfo = require('peer-info') @@ -179,7 +178,7 @@ module.exports = (dht) => ({ dht._log('_findPeerSingle %s', peer.toB58String()) const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) - return promisify(cb => dht.network.sendRequest(peer, msg, cb))() + return dht.network.sendRequest(peer, msg) }, /** @@ -197,7 +196,7 @@ module.exports = (dht) => ({ const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) msg.record = rec - const resp = await promisify(cb => dht.network.sendRequest(target, msg, cb))() + const resp = await dht.network.sendRequest(target, msg) if (!resp.record.value.equals(Record.deserialize(rec).value)) { throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') @@ -362,7 +361,7 @@ module.exports = (dht) => ({ async _getValueSingle (peer, key) { // eslint-disable-line require-await const msg = new Message(Message.TYPES.GET_VALUE, key, 0) - return promisify(cb => dht.network.sendRequest(peer, msg, cb))() + return dht.network.sendRequest(peer, msg) }, /** @@ -500,6 +499,6 @@ module.exports = (dht) => ({ */ async _findProvidersSingle (peer, key) { // eslint-disable-line require-await const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0) - return promisify(cb => dht.network.sendRequest(peer, msg, cb))() + return dht.network.sendRequest(peer, msg) } }) diff --git a/src/providers.js b/src/providers.js index a98ff8b7..ab69bce4 100644 --- a/src/providers.js +++ b/src/providers.js @@ -61,7 +61,7 @@ class Providers { /** * Release any resources. * - * @returns {undefined} + * @returns {void} */ stop () { if (this._cleaner) { @@ -73,8 +73,7 @@ class Providers { /** * Check all providers if they are still valid, and if not delete them. * - * @returns {Promise} - * + * @returns {Promise} * @private */ _cleanup () { @@ -178,7 +177,7 @@ class Providers { * * @param {CID} cid * @param {PeerId} provider - * @returns {Promise} + * @returns {Promise} */ async addProvider (cid, provider) { // eslint-disable-line require-await return this.syncQueue.add(async () => { @@ -232,7 +231,7 @@ function makeProviderKey (cid) { * @param {CID} cid * @param {PeerId} peer * @param {number} time - * @returns {Promise} + * @returns {Promise} * * @private */ diff --git a/src/routing.js b/src/routing.js index 18fca06a..39dbad2a 100644 --- a/src/routing.js +++ b/src/routing.js @@ -122,7 +122,7 @@ class RoutingTable { * Remove a given peer from the table. * * @param {PeerId} peer - * @returns {Promose} + * @returns {Promise} */ async remove (peer) { const id = await utils.convertPeerId(peer) diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js index b90894e0..ec3a6dd9 100644 --- a/src/rpc/handlers/add-provider.js +++ b/src/rpc/handlers/add-provider.js @@ -2,7 +2,6 @@ const CID = require('cids') const errcode = require('err-code') -const promiseToCallback = require('promise-to-callback') const utils = require('../../utils') @@ -13,14 +12,13 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error)} callback - * @returns {undefined} + * @returns {Promise} */ - return function addProvider (peer, msg, callback) { + return async function addProvider (peer, msg) { // eslint-disable-line require-await log('start') if (!msg.key || msg.key.length === 0) { - return callback(errcode(new Error('Missing key'), 'ERR_MISSING_KEY')) + throw errcode(new Error('Missing key'), 'ERR_MISSING_KEY') } let cid @@ -28,11 +26,9 @@ module.exports = (dht) => { cid = new CID(msg.key) } catch (err) { const errMsg = `Invalid CID: ${err.message}` - - return callback(errcode(new Error(errMsg), 'ERR_INVALID_CID')) + throw errcode(new Error(errMsg), 'ERR_INVALID_CID') } - let foundProvider = false msg.providerPeers.forEach((pi) => { // Ignore providers not from the originator if (!pi.id.isEqual(peer.id)) { @@ -48,9 +44,8 @@ module.exports = (dht) => { log('received provider %s for %s (addrs %s)', peer.id.toB58String(), cid.toBaseEncodedString(), pi.multiaddrs.toArray().map((m) => m.toString())) if (!dht._isSelf(pi.id)) { - foundProvider = true dht.peerBook.put(pi) - promiseToCallback(dht.providers.addProvider(cid, pi.id))(err => callback(err)) + return dht.providers.addProvider(cid, pi.id) } }) @@ -60,8 +55,6 @@ module.exports = (dht) => { // we can't find any valid providers in the payload. // https://github.com/libp2p/js-libp2p-kad-dht/pull/127 // https://github.com/libp2p/js-libp2p-kad-dht/issues/128 - if (!foundProvider) { - promiseToCallback(dht.providers.addProvider(cid, peer.id))(err => callback(err)) - } + return dht.providers.addProvider(cid, peer.id) } } diff --git a/src/rpc/handlers/find-node.js b/src/rpc/handlers/find-node.js index 6243d411..af0102bc 100644 --- a/src/rpc/handlers/find-node.js +++ b/src/rpc/handlers/find-node.js @@ -1,8 +1,5 @@ 'use strict' -const waterfall = require('async/waterfall') -const promiseToCallback = require('promise-to-callback') - const Message = require('../../message') const utils = require('../../utils') @@ -14,31 +11,26 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {undefined} + * @returns {Promise} */ - return function findNode (peer, msg, callback) { + return async function findNode (peer, msg) { log('start') - waterfall([ - (cb) => { - if (msg.key.equals(dht.peerInfo.id.id)) { - return cb(null, [dht.peerInfo]) - } + let closer + if (msg.key.equals(dht.peerInfo.id.id)) { + closer = [dht.peerInfo] + } else { + closer = await dht._betterPeersToQuery(msg, peer) + } - promiseToCallback(dht._betterPeersToQuery(msg, peer))(cb) - }, - (closer, cb) => { - const response = new Message(msg.type, Buffer.alloc(0), msg.clusterLevel) + const response = new Message(msg.type, Buffer.alloc(0), msg.clusterLevel) - if (closer.length > 0) { - response.closerPeers = closer - } else { - log('handle FindNode %s: could not find anything', peer.id.toB58String()) - } + if (closer.length > 0) { + response.closerPeers = closer + } else { + log('handle FindNode %s: could not find anything', peer.id.toB58String()) + } - cb(null, response) - } - ], callback) + return response } } diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js index 8cb72aaf..d8cfd984 100644 --- a/src/rpc/handlers/get-providers.js +++ b/src/rpc/handlers/get-providers.js @@ -2,7 +2,6 @@ const CID = require('cids') const PeerInfo = require('peer-info') -const promiseToCallback = require('promise-to-callback') const errcode = require('err-code') const Message = require('../../message') @@ -16,9 +15,9 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @returns {Promise} Resolves a `Message` response + * @returns {Promise} */ - async function getProvidersAsync (peer, msg) { + return async function getProviders (peer, msg) { let cid try { cid = new CID(msg.key) @@ -60,16 +59,4 @@ module.exports = (dht) => { log('got %s providers %s closerPeers', providers.length, closer.length) return response } - - /** - * Process `GetProviders` DHT messages. - * - * @param {PeerInfo} peer - * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {undefined} - */ - return function getProviders (peer, msg, callback) { - promiseToCallback(getProvidersAsync(peer, msg))(callback) - } } diff --git a/src/rpc/handlers/get-value.js b/src/rpc/handlers/get-value.js index b9891ed8..f0f8729c 100644 --- a/src/rpc/handlers/get-value.js +++ b/src/rpc/handlers/get-value.js @@ -1,7 +1,5 @@ 'use strict' -const promiseToCallback = require('promise-to-callback') -const parallel = require('async/parallel') const { Record } = require('libp2p-record') const errcode = require('err-code') @@ -17,16 +15,15 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {undefined} + * @returns {Promise} */ - return function getValue (peer, msg, callback) { + return async function getValue (peer, msg) { const key = msg.key log('key: %b', key) if (!key || key.length === 0) { - return callback(errcode(new Error('Invalid key'), 'ERR_INVALID_KEY')) + throw errcode(new Error('Invalid key'), 'ERR_INVALID_KEY') } const response = new Message(Message.TYPES.GET_VALUE, key, msg.clusterLevel) @@ -45,32 +42,25 @@ module.exports = (dht) => { if (info && info.id.pubKey) { log('returning found public key') response.record = new Record(key, info.id.pubKey.bytes) - return callback(null, response) + return response } } - parallel([ // TODO - (cb) => promiseToCallback(dht._checkLocalDatastore(key))(cb), - (cb) => promiseToCallback(dht._betterPeersToQuery(msg, peer))(cb) - ], (err, res) => { - if (err) { - return callback(err) - } - - const record = res[0] - const closer = res[1] + const [record, closer] = await Promise.all([ + dht._checkLocalDatastore(key), + dht._betterPeersToQuery(msg, peer) + ]) - if (record) { - log('got record') - response.record = record - } + if (record) { + log('got record') + response.record = record + } - if (closer.length > 0) { - log('got closer %s', closer.length) - response.closerPeers = closer - } + if (closer.length > 0) { + log('got closer %s', closer.length) + response.closerPeers = closer + } - callback(null, response) - }) + return response } } diff --git a/src/rpc/handlers/ping.js b/src/rpc/handlers/ping.js index a3430393..dfb3c02b 100644 --- a/src/rpc/handlers/ping.js +++ b/src/rpc/handlers/ping.js @@ -10,11 +10,10 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {undefined} + * @returns {Message} */ - return function ping (peer, msg, callback) { + return function ping (peer, msg) { log('from %s', peer.id.toB58String()) - callback(null, msg) + return msg } } diff --git a/src/rpc/handlers/put-value.js b/src/rpc/handlers/put-value.js index 92b89351..bbdd69ca 100644 --- a/src/rpc/handlers/put-value.js +++ b/src/rpc/handlers/put-value.js @@ -2,7 +2,6 @@ const utils = require('../../utils') const errcode = require('err-code') -const promiseToCallback = require('promise-to-callback') module.exports = (dht) => { const log = utils.logger(dht.peerInfo.id, 'rpc:put-value') @@ -12,10 +11,9 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {undefined} + * @returns {Promise} */ - return function putValue (peer, msg, callback) { + return async function putValue (peer, msg) { const key = msg.key log('key: %b', key) @@ -25,26 +23,15 @@ module.exports = (dht) => { const errMsg = `Empty record from: ${peer.id.toB58String()}` log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_EMPTY_RECORD')) + throw errcode(new Error(errMsg), 'ERR_EMPTY_RECORD') } - promiseToCallback(dht._verifyRecordLocally(record))((err) => { - if (err) { - log.error(err.message) - return callback(err) - } + await dht._verifyRecordLocally(record) - record.timeReceived = new Date() + record.timeReceived = new Date() + const recordKey = utils.bufferToKey(record.key) + await dht.datastore.put(recordKey, record.serialize()) - const key = utils.bufferToKey(record.key) - - promiseToCallback(dht.datastore.put(key, record.serialize()))(err => { - if (err) { - return callback(err) - } - - callback(null, msg) - }) - }) + return msg } } diff --git a/src/rpc/index.js b/src/rpc/index.js index adbd0713..d7cb8846 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -2,7 +2,6 @@ const pull = require('pull-stream') const lp = require('pull-length-prefixed') -const promiseToCallback = require('promise-to-callback') const Message = require('../message') const handlers = require('./handlers') @@ -11,35 +10,34 @@ const c = require('../constants') module.exports = (dht) => { const log = utils.logger(dht.peerInfo.id, 'rpc') - const getMessageHandler = handlers(dht) + /** * Process incoming DHT messages. * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback - * @returns {void} + * @returns {Promise} * * @private */ - function handleMessage (peer, msg, callback) { - promiseToCallback(dht._add(peer))((err) => { - if (err) { - log.error('Failed to update the kbucket store') - log.error(err) - } + async function handleMessage (peer, msg) { + try { + await dht._add(peer) + } catch (err) { + log.error('Failed to update the kbucket store') + log.error(err) + } - // get handler & exectue it - const handler = getMessageHandler(msg.type) + // get handler & exectue it + const handler = getMessageHandler(msg.type) - if (!handler) { - log.error(`no handler found for message type: ${msg.type}`) - return callback() - } + if (!handler) { + log.error(`no handler found for message type: ${msg.type}`) + return + } - handler(peer, msg, callback) - }) + return handler(peer, msg) } /** @@ -47,7 +45,7 @@ module.exports = (dht) => { * * @param {string} protocol * @param {Connection} conn - * @returns {undefined} + * @returns {void} */ return function protocolHandler (protocol, conn) { conn.getPeerInfo((err, peer) => { @@ -75,7 +73,15 @@ module.exports = (dht) => { return msg }), pull.filter(Boolean), - pull.asyncMap((msg, cb) => handleMessage(peer, msg, cb)), + pull.asyncMap(async (msg, cb) => { + let response + try { + response = await handleMessage(peer, msg) + } catch (err) { + cb(err) + } + cb(null, response) + }), // Not all handlers will return a response pull.filter(Boolean), pull.map((response) => { diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index 7ed5c6fb..1cfadebe 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -7,8 +7,6 @@ chai.use(require('chai-checkmark')) const expect = chai.expect const promisify = require('promisify-es6') const sinon = require('sinon') -const series = require('async/series') -const each = require('async/each') const { Record } = require('libp2p-record') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -21,6 +19,7 @@ const errcode = require('err-code') const pMapSeries = require('p-map-series') const pEachSeries = require('p-each-series') const pRetry = require('p-retry') +const pTimeout = require('p-timeout') const delay = require('delay') const KadDHT = require('../src') @@ -65,35 +64,38 @@ const connect = async (a, b) => { await find(b, a) } -// function bootstrap (dhts) { -// dhts.forEach((dht) => { -// dht.randomWalk._walk(1, 10000, () => {}) -// }) -// } - -// function waitForWellFormedTables (dhts, minPeers, avgPeers, waitTimeout, callback) { -// timeout((cb) => { -// retry({ times: 50, interval: 200 }, (cb) => { -// let totalPeers = 0 - -// const ready = dhts.map((dht) => { -// const rtlen = dht.routingTable.size -// totalPeers += rtlen -// if (minPeers > 0 && rtlen < minPeers) { -// return false -// } -// const actualAvgPeers = totalPeers / dhts.length -// if (avgPeers > 0 && actualAvgPeers < avgPeers) { -// return false -// } -// return true -// }) - -// const done = ready.every(Boolean) -// cb(done ? null : new Error('not done yet')) -// }, cb) -// }, waitTimeout)(callback) -// } +function bootstrap (dhts) { + dhts.forEach((dht) => { + dht.randomWalk._walk(1, 10000) + }) +} + +function waitForWellFormedTables (dhts, minPeers, avgPeers, waitTimeout) { + return pTimeout(pRetry(async () => { + let totalPeers = 0 + + const ready = dhts.map((dht) => { + const rtlen = dht.routingTable.size + totalPeers += rtlen + if (minPeers > 0 && rtlen < minPeers) { + return false + } + const actualAvgPeers = totalPeers / dhts.length + if (avgPeers > 0 && actualAvgPeers < avgPeers) { + return false + } + return true + }) + + if (ready.every(Boolean)) { + return + } + await delay(200) + throw new Error('not done yet') + }, { + retries: 50 + }), waitTimeout) +} // Count how many peers are in b but are not in a function countDiffPeers (a, b) { @@ -402,8 +404,7 @@ describe('KadDHT', () => { return tdht.teardown() }) - // TODO - it.skip('put - get should fail if unrecognized key prefix in get', async function () { + it('put - get should fail if unrecognized key prefix in get', async function () { this.timeout(10 * 1000) const key = Buffer.from('/v2/hello') @@ -416,6 +417,7 @@ describe('KadDHT', () => { try { await dhtA.put(key, value) + await dhtA.get(key) } catch (err) { expect(err).to.exist() return tdht.teardown() @@ -539,34 +541,23 @@ describe('KadDHT', () => { return tdht.teardown() }) - // TODO - it.skip('random-walk', function () { + it('random-walk', async function () { this.timeout(20 * 1000) - // const nDHTs = 20 - // const tdht = new TestDHT() - - // // random walk disabled for a manual usage - // const dhts = await tdht.spawn(nDHTs) - - // return tdht.teardown() - // tdht.spawn(nDHTs, (err, dhts) => { - // expect(err).to.not.exist() - - // series([ - // // ring connect - // (cb) => times(nDHTs, (i, cb) => { - // connect(dhts[i], dhts[(i + 1) % nDHTs], cb) - // }, (err) => cb(err)), - // (cb) => { - // bootstrap(dhts) - // waitForWellFormedTables(dhts, 7, 0, 20 * 1000, cb) - // } - // ], (err) => { - // expect(err).to.not.exist() - // tdht.teardown(done) - // }) - // }) + const nDHTs = 20 + const tdht = new TestDHT() + + // random walk disabled for a manual usage + const dhts = await tdht.spawn(nDHTs) + + await Promise.all( + Array.from({ length: nDHTs }).map((_, i) => connect(dhts[i], dhts[(i + 1) % nDHTs])) + ) + + bootstrap(dhts) + await waitForWellFormedTables(dhts, 7, 0, 20 * 1000) + + return tdht.teardown() }) it('layered get', async function () { @@ -615,110 +606,83 @@ describe('KadDHT', () => { return tdht.teardown() }) - it.skip('find peer query', function (done) { + it('find peer query', async function () { this.timeout(40 * 1000) // Create 101 nodes const nDHTs = 100 const tdht = new TestDHT() + const dhts = await tdht.spawn(nDHTs) + + const dhtsById = new Map(dhts.map((d) => [d.peerInfo.id, d])) + const ids = [...dhtsById.keys()] + + // The origin node for the FIND_PEER query + const guy = dhts[0] + + // The key + const val = Buffer.from('foobar') - tdht.spawn(nDHTs, (err, dhts) => { - expect(err).to.not.exist() - - const dhtsById = new Map(dhts.map((d) => [d.peerInfo.id, d])) - const ids = [...dhtsById.keys()] - - // The origin node for the FIND_PEER query - const guy = dhts[0] - - // The key - const val = Buffer.from('foobar') - // The key as a DHT key - let rtval - - series([ - // Hash the key into the DHT's key format - (cb) => kadUtils.convertBuffer(val, (err, dhtKey) => { - expect(err).to.not.exist() - rtval = dhtKey - cb() - }), - // Make connections between nodes close to each other - (cb) => kadUtils.sortClosestPeers(ids, rtval, (err, sorted) => { - expect(err).to.not.exist() - - const conns = [] - const maxRightIndex = sorted.length - 1 - for (let i = 0; i < sorted.length; i++) { - // Connect to 5 nodes on either side (10 in total) - for (const distance of [1, 3, 11, 31, 63]) { - let rightIndex = i + distance - if (rightIndex > maxRightIndex) { - rightIndex = maxRightIndex * 2 - (rightIndex + 1) - } - let leftIndex = i - distance - if (leftIndex < 0) { - leftIndex = 1 - leftIndex - } - conns.push([sorted[leftIndex], sorted[rightIndex]]) - } - } - - each(conns, (conn, _cb) => connect(dhtsById.get(conn[0]), dhtsById.get(conn[1]), _cb), cb) - }), - (cb) => { - // Get the alpha (3) closest peers to the key from the origin's - // routing table - const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) - expect(rtablePeers).to.have.length(c.ALPHA) - - // The set of peers used to initiate the query (the closest alpha - // peers to the key that the origin knows about) - const rtableSet = {} - rtablePeers.forEach((p) => { - rtableSet[p.toB58String()] = true - }) - - const guyIndex = ids.findIndex(i => i.id.equals(guy.peerInfo.id.id)) - const otherIds = ids.slice(0, guyIndex).concat(ids.slice(guyIndex + 1)) - series([ - // Make the query - (cb) => guy.getClosestPeers(val, cb), - // Find the closest connected peers to the key - (cb) => kadUtils.sortClosestPeers(otherIds, rtval, cb) - ], (err, res) => { - expect(err).to.not.exist() - - // Query response - const out = res[0] - - // All connected peers in order of distance from key - const actualClosest = res[1] - - // Expect that the response includes nodes that are were not - // already in the origin's routing table (ie it went out to - // the network to find closer peers) - expect(out.filter((p) => !rtableSet[p.toB58String()])) - .to.not.be.empty() - - // Expect that there were kValue peers found - expect(out).to.have.length(c.K) - - // The expected closest kValue peers to the key - const exp = actualClosest.slice(0, c.K) - - // Expect the kValue peers found to be the kValue closest connected peers - // to the key - expect(countDiffPeers(exp, out)).to.eql(0) - - cb() - }) + // Hash the key into the DHT's key format + const rtval = await kadUtils.convertBuffer(val) + // Make connections between nodes close to each other + const sorted = await kadUtils.sortClosestPeers(ids, rtval) + + const conns = [] + const maxRightIndex = sorted.length - 1 + for (let i = 0; i < sorted.length; i++) { + // Connect to 5 nodes on either side (10 in total) + for (const distance of [1, 3, 11, 31, 63]) { + let rightIndex = i + distance + if (rightIndex > maxRightIndex) { + rightIndex = maxRightIndex * 2 - (rightIndex + 1) + } + let leftIndex = i - distance + if (leftIndex < 0) { + leftIndex = 1 - leftIndex } - ], (err) => { - expect(err).to.not.exist() - tdht.teardown(done) - }) + conns.push([sorted[leftIndex], sorted[rightIndex]]) + } + } + + await Promise.all(conns.map((conn) => connect(dhtsById.get(conn[0]), dhtsById.get(conn[1])))) + + // Get the alpha (3) closest peers to the key from the origin's + // routing table + const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) + expect(rtablePeers).to.have.length(c.ALPHA) + + // The set of peers used to initiate the query (the closest alpha + // peers to the key that the origin knows about) + const rtableSet = {} + rtablePeers.forEach((p) => { + rtableSet[p.toB58String()] = true }) + + const guyIndex = ids.findIndex(i => i.id.equals(guy.peerInfo.id.id)) + const otherIds = ids.slice(0, guyIndex).concat(ids.slice(guyIndex + 1)) + + // Make the query + const out = await guy.getClosestPeers(val) + const actualClosest = await kadUtils.sortClosestPeers(otherIds, rtval) + + // Expect that the response includes nodes that are were not + // already in the origin's routing table (ie it went out to + // the network to find closer peers) + expect(out.filter((p) => !rtableSet[p.toB58String()])) + .to.not.be.empty() + + // Expect that there were kValue peers found + expect(out).to.have.length(c.K) + + // The expected closest kValue peers to the key + const exp = actualClosest.slice(0, c.K) + + // Expect the kValue peers found to be the kValue closest connected peers + // to the key + expect(countDiffPeers(exp, out)).to.eql(0) + + return tdht.teardown() }) it('getClosestPeers', async function () { @@ -738,7 +702,7 @@ describe('KadDHT', () => { return tdht.teardown() }) - describe.skip('getPublicKey', () => { + describe('getPublicKey', () => { it('already known', async function () { this.timeout(20 * 1000) @@ -751,6 +715,10 @@ describe('KadDHT', () => { const key = await dhts[0].getPublicKey(ids[1]) expect(key).to.eql(dhts[1].peerInfo.id.pubKey) + // TODO: Switch not closing well, but it will be removed + // (invalid transition: STOPPED -> done) + await delay(100) + return tdht.teardown() }) @@ -761,6 +729,7 @@ describe('KadDHT', () => { const dhts = await tdht.spawn(2) const ids = dhts.map((d) => d.peerInfo.id) + await connect(dhts[0], dhts[1]) // remove the pub key to be sure it is fetched @@ -909,17 +878,22 @@ describe('KadDHT', () => { }) describe('errors', () => { - it.skip('get many should fail if only has one peer', async function () { + it('get many should fail if only has one peer', async function () { this.timeout(20 * 1000) const tdht = new TestDHT() const dhts = await tdht.spawn(1) + // TODO: Switch not closing well, but it will be removed + // (invalid transition: STOPPED -> done) + await delay(100) + try { await dhts[0].getMany(Buffer.from('/v/hello'), 5) } catch (err) { expect(err).to.exist() expect(err.code).to.be.eql('ERR_NO_PEERS_IN_ROUTING_TABLE') + return tdht.teardown() } throw new Error('get many should fail if only has one peer') @@ -1078,7 +1052,7 @@ describe('KadDHT', () => { expect(res[6]).to.eql(Buffer.from('world')) }) - it.skip('put to several nodes in series with different values and get the last one in a subset of them', async function () { + it('put to several nodes in series with different values and get the last one in a subset of them', async function () { this.timeout(20 * 1000) const key = Buffer.from('/v/hallo') const result = Buffer.from('world4') @@ -1100,7 +1074,6 @@ describe('KadDHT', () => { expect(res[1]).to.eql(result) expect(res[2]).to.eql(result) expect(res[3]).to.eql(result) - // TODO: error }) }) }) diff --git a/test/kad-utils.spec.js b/test/kad-utils.spec.js index 75e6e123..a460db10 100644 --- a/test/kad-utils.spec.js +++ b/test/kad-utils.spec.js @@ -7,7 +7,6 @@ const expect = chai.expect const base32 = require('base32.js') const PeerId = require('peer-id') const distance = require('xor-distance') -const waterfall = require('async/waterfall') const utils = require('../src/utils') const createPeerInfo = require('./utils/create-peer-info') @@ -26,16 +25,12 @@ describe('kad utils', () => { }) describe('convertBuffer', () => { - it('returns the sha2-256 hash of the buffer', (done) => { + it('returns the sha2-256 hash of the buffer', async () => { const buf = Buffer.from('hello world') + const digest = await utils.convertBuffer(buf) - utils.convertBuffer(buf, (err, digest) => { - expect(err).to.not.exist() - - expect(digest) - .to.eql(Buffer.from('b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9', 'hex')) - done() - }) + expect(digest) + .to.eql(Buffer.from('b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9', 'hex')) }) }) @@ -56,7 +51,7 @@ describe('kad utils', () => { }) describe('sortClosestPeers', () => { - it('sorts a list of PeerInfos', (done) => { + it('sorts a list of PeerInfos', async () => { const rawIds = [ '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31', '11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32', @@ -75,21 +70,17 @@ describe('kad utils', () => { ids[0] ] - waterfall([ - (cb) => utils.convertPeerId(ids[0], cb), - (id, cb) => utils.sortClosestPeers(input, id, cb), - (out, cb) => { - expect( - out.map((m) => m.toB58String()) - ).to.eql([ - ids[0], - ids[3], - ids[2], - ids[1] - ].map((m) => m.toB58String())) - done() - } - ], done) + const id = await utils.convertPeerId(ids[0]) + const out = await utils.sortClosestPeers(input, id) + + expect( + out.map((m) => m.toB58String()) + ).to.eql([ + ids[0], + ids[3], + ids[2], + ids[1] + ].map((m) => m.toB58String())) }) }) @@ -110,31 +101,23 @@ describe('kad utils', () => { }) describe('keyForPublicKey', () => { - it('works', (done) => { - createPeerInfo(1, (err, peers) => { - expect(err).to.not.exist() - - expect(utils.keyForPublicKey(peers[0].id)) - .to.eql(Buffer.concat([Buffer.from('/pk/'), peers[0].id.id])) - done() - }) + it('works', async () => { + const peers = await createPeerInfo(1) + expect(utils.keyForPublicKey(peers[0].id)) + .to.eql(Buffer.concat([Buffer.from('/pk/'), peers[0].id.id])) }) }) describe('fromPublicKeyKey', () => { - it('round trips', function (done) { + it('round trips', async function () { this.timeout(40 * 1000) - createPeerInfo(50, (err, peers) => { - expect(err).to.not.exist() - - peers.forEach((p, i) => { - const id = p.id - expect(utils.isPublicKeyKey(utils.keyForPublicKey(id))).to.eql(true) - expect(utils.fromPublicKeyKey(utils.keyForPublicKey(id)).id) - .to.eql(id.id) - }) - done() + const peers = await createPeerInfo(50) + peers.forEach((p, i) => { + const id = p.id + expect(utils.isPublicKeyKey(utils.keyForPublicKey(id))).to.eql(true) + expect(utils.fromPublicKeyKey(utils.keyForPublicKey(id)).id) + .to.eql(id.id) }) }) }) diff --git a/test/limited-peer-list.spec.js b/test/limited-peer-list.spec.js index fbbf2210..e9075509 100644 --- a/test/limited-peer-list.spec.js +++ b/test/limited-peer-list.spec.js @@ -6,22 +6,13 @@ chai.use(require('dirty-chai')) const expect = chai.expect const LimitedPeerList = require('../src/limited-peer-list') - const createPeerInfo = require('./utils/create-peer-info') describe('LimitedPeerList', () => { let peers - before(function (done) { - this.timeout(10 * 1000) - - createPeerInfo(5, (err, p) => { - if (err) { - return done(err) - } - peers = p - done() - }) + before(async () => { + peers = await createPeerInfo(5) }) it('basics', () => { diff --git a/test/message.spec.js b/test/message.spec.js index 224edf77..317fead8 100644 --- a/test/message.spec.js +++ b/test/message.spec.js @@ -6,10 +6,9 @@ chai.use(require('dirty-chai')) const expect = chai.expect const PeerInfo = require('peer-info') const PeerId = require('peer-id') -const map = require('async/map') const range = require('lodash.range') const random = require('lodash.random') -const Record = require('libp2p-record').Record +const { Record } = require('libp2p-record') const fs = require('fs') const path = require('path') @@ -27,60 +26,58 @@ describe('Message', () => { expect(msg).to.have.property('clusterLevel', 4) }) - it('serialize & deserialize', function (done) { + it('serialize & deserialize', async function () { this.timeout(10 * 1000) - map(range(5), (n, cb) => PeerId.create({ bits: 1024 }, cb), (err, peers) => { - expect(err).to.not.exist() - - const closer = peers.slice(0, 5).map((p) => { - const info = new PeerInfo(p) - const addr = `/ip4/198.176.1.${random(198)}/tcp/1234` - info.multiaddrs.add(addr) - info.multiaddrs.add(`/ip4/100.176.1.${random(198)}`) - info.connect(addr) - - return info - }) - - const provider = peers.slice(0, 5).map((p) => { - const info = new PeerInfo(p) - info.multiaddrs.add(`/ip4/98.176.1.${random(198)}/tcp/1234`) - info.multiaddrs.add(`/ip4/10.176.1.${random(198)}`) - - return info - }) - - const msg = new Message(Message.TYPES.GET_VALUE, Buffer.from('hello'), 5) - const record = new Record(Buffer.from('hello'), Buffer.from('world')) - - msg.closerPeers = closer - msg.providerPeers = provider - msg.record = record - - const enc = msg.serialize() - const dec = Message.deserialize(enc) - - expect(dec.type).to.be.eql(msg.type) - expect(dec.key).to.be.eql(msg.key) - expect(dec.clusterLevel).to.be.eql(msg.clusterLevel) - expect(dec.record.serialize()).to.be.eql(record.serialize()) - expect(dec.record.key).to.eql(Buffer.from('hello')) - - expect(dec.closerPeers).to.have.length(5) - dec.closerPeers.forEach((peer, i) => { - expect(peer.id.isEqual(msg.closerPeers[i].id)).to.eql(true) - expect(peer.multiaddrs.toArray()) - .to.eql(msg.closerPeers[i].multiaddrs.toArray()) - }) - - expect(dec.providerPeers).to.have.length(5) - dec.providerPeers.forEach((peer, i) => { - expect(peer.id.isEqual(msg.providerPeers[i].id)).to.equal(true) - expect(peer.multiaddrs.toArray()) - .to.eql(msg.providerPeers[i].multiaddrs.toArray()) - }) - - done() + + const peers = await Promise.all( + Array.from({ length: 5 }).map(() => PeerId.create({ bits: 1024 }))) + + const closer = peers.slice(0, 5).map((p) => { + const info = new PeerInfo(p) + const addr = `/ip4/198.176.1.${random(198)}/tcp/1234` + info.multiaddrs.add(addr) + info.multiaddrs.add(`/ip4/100.176.1.${random(198)}`) + info.connect(addr) + + return info + }) + + const provider = peers.slice(0, 5).map((p) => { + const info = new PeerInfo(p) + info.multiaddrs.add(`/ip4/98.176.1.${random(198)}/tcp/1234`) + info.multiaddrs.add(`/ip4/10.176.1.${random(198)}`) + + return info + }) + + const msg = new Message(Message.TYPES.GET_VALUE, Buffer.from('hello'), 5) + const record = new Record(Buffer.from('hello'), Buffer.from('world')) + + msg.closerPeers = closer + msg.providerPeers = provider + msg.record = record + + const enc = msg.serialize() + const dec = Message.deserialize(enc) + + expect(dec.type).to.be.eql(msg.type) + expect(dec.key).to.be.eql(msg.key) + expect(dec.clusterLevel).to.be.eql(msg.clusterLevel) + expect(dec.record.serialize()).to.be.eql(record.serialize()) + expect(dec.record.key).to.eql(Buffer.from('hello')) + + expect(dec.closerPeers).to.have.length(5) + dec.closerPeers.forEach((peer, i) => { + expect(peer.id.isEqual(msg.closerPeers[i].id)).to.eql(true) + expect(peer.multiaddrs.toArray()) + .to.eql(msg.closerPeers[i].multiaddrs.toArray()) + }) + + expect(dec.providerPeers).to.have.length(5) + dec.providerPeers.forEach((peer, i) => { + expect(peer.id.isEqual(msg.providerPeers[i].id)).to.equal(true) + expect(peer.multiaddrs.toArray()) + .to.eql(msg.providerPeers[i].multiaddrs.toArray()) }) }) diff --git a/test/network.spec.js b/test/network.spec.js index 2bd5806a..45790389 100644 --- a/test/network.spec.js +++ b/test/network.spec.js @@ -7,7 +7,7 @@ const expect = chai.expect const Connection = require('interface-connection').Connection const pull = require('pull-stream') const lp = require('pull-length-prefixed') -const series = require('async/series') +const pDefer = require('p-defer') const PeerBook = require('peer-book') const Switch = require('libp2p-switch') const TCP = require('libp2p-tcp') @@ -22,41 +22,32 @@ describe('Network', () => { let dht let peerInfos - before(function (done) { + before(async function () { this.timeout(10 * 1000) - createPeerInfo(3, (err, result) => { - if (err) { - return done(err) - } + peerInfos = await createPeerInfo(3) - peerInfos = result - const sw = new Switch(peerInfos[0], new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - dht = new KadDHT(sw) - - series([ - (cb) => sw.start(cb), - (cb) => dht.start(cb) - ], done) - }) - }) + const sw = new Switch(peerInfos[0], new PeerBook()) + sw.transport.add('tcp', new TCP()) + sw.connection.addStreamMuxer(Mplex) + sw.connection.reuse() + dht = new KadDHT(sw) - after(function (done) { - this.timeout(10 * 1000) - series([ - (cb) => dht.stop(cb), - (cb) => dht.switch.stop(cb) - ], done) + await sw.start() + await dht.start() }) + after(() => Promise.all([ + dht.stop(), + dht.switch.stop() + ])) + describe('sendRequest', () => { - it('send and response', (done) => { + it('send and response', async () => { + const defer = pDefer() let i = 0 const finish = () => { if (i++ === 1) { - done() + defer.resolve() } } @@ -85,19 +76,20 @@ describe('Network', () => { callback(null, conn) } - dht.network.sendRequest(peerInfos[0].id, msg, (err, response) => { - expect(err).to.not.exist() - expect(response.type).to.eql(Message.TYPES.FIND_NODE) + const response = await dht.network.sendRequest(peerInfos[0].id, msg) - finish() - }) + expect(response.type).to.eql(Message.TYPES.FIND_NODE) + finish() + + return defer.promise }) - it('timeout on no message', (done) => { + it('timeout on no message', async () => { + const defer = pDefer() let i = 0 const finish = () => { if (i++ === 1) { - done() + defer.resolve() } } @@ -124,12 +116,16 @@ describe('Network', () => { dht.network.readMessageTimeout = 100 - dht.network.sendRequest(peerInfos[0].id, msg, (err, response) => { + try { + await dht.network.sendRequest(peerInfos[0].id, msg) + } catch (err) { expect(err).to.exist() expect(err.message).to.match(/timed out/) finish() - }) + } + + return defer.promise }) }) }) diff --git a/test/peer-distance-list.spec.js b/test/peer-distance-list.spec.js index a34322b3..feb94fb1 100644 --- a/test/peer-distance-list.spec.js +++ b/test/peer-distance-list.spec.js @@ -5,7 +5,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const PeerId = require('peer-id') -const series = require('async/series') const kadUtils = require('../src/utils') const PeerDistanceList = require('../src/peer-distance-list') @@ -20,131 +19,98 @@ describe('PeerDistanceList', () => { const p7 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32')) let key - before((done) => { - kadUtils.convertPeerId(p1, (err, k) => { - if (err) { - return done(err) - } - - key = k - - done() - }) + before(async () => { + key = await kadUtils.convertPeerId(p1) }) describe('basics', () => { - it('add', (done) => { + it('add', async () => { const pdl = new PeerDistanceList(key) - series([ - (cb) => pdl.add(p3, cb), - (cb) => pdl.add(p1, cb), - (cb) => pdl.add(p2, cb), - (cb) => pdl.add(p4, cb), - (cb) => pdl.add(p5, cb), - (cb) => pdl.add(p1, cb) - ], (err) => { - expect(err).to.not.exist() - - // Note: p1 and p5 are equal - expect(pdl.length).to.eql(4) - expect(pdl.peers).to.be.eql([p1, p4, p3, p2]) - - done() - }) + await pdl.add(p3) + await pdl.add(p1) + await pdl.add(p2) + await pdl.add(p4) + await pdl.add(p5) + await pdl.add(p1) + + // Note: p1 and p5 are equal + expect(pdl.length).to.eql(4) + expect(pdl.peers).to.be.eql([p1, p4, p3, p2]) }) - it('capacity', (done) => { + it('capacity', async () => { const pdl = new PeerDistanceList(key, 3) - series([ - (cb) => pdl.add(p1, cb), - (cb) => pdl.add(p2, cb), - (cb) => pdl.add(p3, cb), - (cb) => pdl.add(p4, cb), - (cb) => pdl.add(p5, cb), - (cb) => pdl.add(p6, cb) - ], (err) => { - expect(err).to.not.exist() - - // Note: p1 and p5 are equal - expect(pdl.length).to.eql(3) - - // Closer peers added later should replace further - // peers added earlier - expect(pdl.peers).to.be.eql([p1, p6, p4]) - - done() - }) + await pdl.add(p1) + await pdl.add(p2) + await pdl.add(p3) + await pdl.add(p4) + await pdl.add(p5) + await pdl.add(p6) + + // Note: p1 and p5 are equal + expect(pdl.length).to.eql(3) + + // Closer peers added later should replace further + // peers added earlier + expect(pdl.peers).to.be.eql([p1, p6, p4]) }) }) describe('closer', () => { let pdl - before((done) => { + before(async () => { pdl = new PeerDistanceList(key) - series([ - (cb) => pdl.add(p1, cb), - (cb) => pdl.add(p2, cb), - (cb) => pdl.add(p3, cb), - (cb) => pdl.add(p4, cb) - ], done) + + await pdl.add(p1) + await pdl.add(p2) + await pdl.add(p3) + await pdl.add(p4) }) - it('single closer peer', (done) => { - pdl.anyCloser([p6], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(true) - done() - }) + it('single closer peer', async () => { + const closer = await pdl.anyCloser([p6]) + + expect(closer).to.be.eql(true) }) - it('single further peer', (done) => { - pdl.anyCloser([p7], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(false) - done() - }) + it('single further peer', async () => { + const closer = await pdl.anyCloser([p7]) + + expect(closer).to.be.eql(false) }) - it('closer and further peer', (done) => { - pdl.anyCloser([p6, p7], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(true) - done() - }) + it('closer and further peer', async () => { + const closer = await pdl.anyCloser([p6, p7]) + + expect(closer).to.be.eql(true) }) - it('single peer equal to furthest in list', (done) => { - pdl.anyCloser([p2], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(false) - done() - }) + it('single peer equal to furthest in list', async () => { + const closer = await pdl.anyCloser([p2]) + + expect(closer).to.be.eql(false) }) - it('no peers', (done) => { - pdl.anyCloser([], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(false) - done() - }) + it('no peers', async () => { + const closer = await pdl.anyCloser([]) + + expect(closer).to.be.eql(false) }) - it('empty peer distance list', (done) => { - new PeerDistanceList(key).anyCloser([p1], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(true) - done() - }) + it('empty peer distance list', async () => { + const pdl = new PeerDistanceList(key) + const closer = await pdl.anyCloser([p1]) + + expect(closer).to.be.eql(true) }) - it('empty peer distance list and no peers', (done) => { - new PeerDistanceList(key).anyCloser([], (err, closer) => { - expect(err).to.not.exist() - expect(closer).to.be.eql(false) - done() - }) + it('empty peer distance list and no peers', async () => { + const pdl = new PeerDistanceList(key) + const closer = await pdl.anyCloser([]) + + expect(closer).to.be.eql(false) }) }) }) diff --git a/test/peer-list.spec.js b/test/peer-list.spec.js index 8deb5c2c..87809304 100644 --- a/test/peer-list.spec.js +++ b/test/peer-list.spec.js @@ -12,14 +12,8 @@ const createPeerInfo = require('./utils/create-peer-info') describe('PeerList', () => { let peers - before((done) => { - createPeerInfo(3, (err, p) => { - if (err) { - return done(err) - } - peers = p - done() - }) + before(async () => { + peers = await createPeerInfo(3) }) it('basics', () => { diff --git a/test/providers.spec.js b/test/providers.spec.js index a5ebf4d0..5078da2b 100644 --- a/test/providers.spec.js +++ b/test/providers.spec.js @@ -4,18 +4,17 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const promisify = require('promisify-es6') -const Store = require('interface-datastore').MemoryDatastore +const { MemoryDatastore } = require('interface-datastore') const CID = require('cids') const LevelStore = require('datastore-level') const path = require('path') const os = require('os') -const multihashing = promisify(require('multihashing-async')) +const multihashing = require('multihashing-async') const Providers = require('../src/providers') -const createPeerInfo = promisify(require('./utils/create-peer-info')) -const createValues = promisify(require('./utils/create-values')) +const createPeerInfo = require('./utils/create-peer-info') +const createValues = require('./utils/create-values') describe('Providers', () => { let infos @@ -31,7 +30,7 @@ describe('Providers', () => { }) it('simple add and get of providers', async () => { - providers = new Providers(new Store(), infos[2].id) + providers = new Providers(new MemoryDatastore(), infos[2].id) const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') @@ -47,7 +46,7 @@ describe('Providers', () => { }) it('duplicate add of provider is deduped', async () => { - providers = new Providers(new Store(), infos[2].id) + providers = new Providers(new MemoryDatastore(), infos[2].id) const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') @@ -67,7 +66,7 @@ describe('Providers', () => { }) it('more providers than space in the lru cache', async () => { - providers = new Providers(new Store(), infos[2].id, 10) + providers = new Providers(new MemoryDatastore(), infos[2].id, 10) const hashes = await Promise.all([...new Array(100)].map((i) => { return multihashing(Buffer.from(`hello ${i}`), 'sha2-256') @@ -85,7 +84,7 @@ describe('Providers', () => { }) it('expires', async () => { - providers = new Providers(new Store(), infos[2].id) + providers = new Providers(new MemoryDatastore(), infos[2].id) providers.cleanupInterval = 100 providers.provideValidity = 200 @@ -109,6 +108,8 @@ describe('Providers', () => { // slooow so only run when you need to it.skip('many', async function () { + this.timeout(100 * 1000) + const p = path.join( os.tmpdir(), (Math.random() * 100).toString() ) diff --git a/test/query.spec.js b/test/query.spec.js index d03f4ab5..75720d9c 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -8,8 +8,8 @@ const PeerBook = require('peer-book') const Switch = require('libp2p-switch') const TCP = require('libp2p-tcp') const Mplex = require('libp2p-mplex') -const promiseToCallback = require('promise-to-callback') -const promisify = require('promisify-es6') +const pDefer = require('p-defer') +const delay = require('delay') const DHT = require('../src') const Query = require('../src/query') @@ -18,39 +18,27 @@ const createPeerInfo = require('./utils/create-peer-info') const createDisjointTracks = require('./utils/create-disjoint-tracks') const kadUtils = require('../src/utils') -const createDHT = (peerInfos, cb) => { +const createDHT = async (peerInfos) => { const sw = new Switch(peerInfos[0], new PeerBook()) sw.transport.add('tcp', new TCP()) sw.connection.addStreamMuxer(Mplex) sw.connection.reuse() const d = new DHT(sw) - d.start(() => cb(null, d)) + + await d.start() + return d } describe('Query', () => { let peerInfos let dht - before(function (done) { - this.timeout(5 * 1000) - createPeerInfo(40, (err, result) => { - if (err) { - return done(err) - } - - peerInfos = result - createDHT(peerInfos, (err, d) => { - if (err) { - return done(err) - } - - dht = d - done() - }) - }) + before(async () => { + peerInfos = await createPeerInfo(40) + dht = await createDHT(peerInfos) }) - it('simple run', (done) => { + it('simple run', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -73,16 +61,14 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() - expect(res.paths[0].value).to.eql(Buffer.from('cool')) - expect(res.paths[0].success).to.eql(true) - expect(res.finalSet.size).to.eql(2) - done() - }) + const res = await q.run([peerInfos[1].id]) + + expect(res.paths[0].value).to.eql(Buffer.from('cool')) + expect(res.paths[0].success).to.eql(true) + expect(res.finalSet.size).to.eql(2) }) - it('does not return an error if only some queries error', (done) => { + it('does not return an error if only some queries error', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -103,57 +89,52 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).not.to.exist() - - // Should have visited - // - the initial peer passed to the query: peerInfos[1] - // - the peer returned in closerPeers: peerInfos[2] - expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) + const res = await q.run([peerInfos[1].id]) - // The final set should only contain peers that were successfully queried - // (ie no errors) - expect(res.finalSet.size).to.eql(1) - expect(res.finalSet.has(peerInfos[1].id)).to.equal(true) + // Should have visited + // - the initial peer passed to the query: peerInfos[1] + // - the peer returned in closerPeers: peerInfos[2] + expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) - done() - }) + // The final set should only contain peers that were successfully queried + // (ie no errors) + expect(res.finalSet.size).to.eql(1) + expect(res.finalSet.has(peerInfos[1].id)).to.equal(true) }) - it('returns an error if all queries error', (done) => { + it('returns an error if all queries error', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers dht.switch.dial = (peer, callback) => callback() const queryFunc = async (p) => { throw new Error('fail') } // eslint-disable-line require-await - const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { + + try { + await q.run([peerInfos[1].id]) + } catch (err) { expect(err).to.exist() expect(err.message).to.eql('fail') - done() - }) + return + } + + throw new Error('should return an error if all queries error') }) - it('returns empty run if initial peer list is empty', (done) => { + it('returns empty run if initial peer list is empty', async () => { const peer = peerInfos[0] - const queryFunc = async (p) => {} const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([]))((err, res) => { - expect(err).to.not.exist() - - // Should not visit any peers - expect(res.paths.length).to.eql(0) - expect(res.finalSet.size).to.eql(0) + const res = await q.run([]) - done() - }) + // Should not visit any peers + expect(res.paths.length).to.eql(0) + expect(res.finalSet.size).to.eql(0) }) - it('only closerPeers', (done) => { + it('only closerPeers', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -166,14 +147,12 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() - expect(res.finalSet.size).to.eql(2) - done() - }) + const res = await q.run([peerInfos[1].id]) + + expect(res.finalSet.size).to.eql(2) }) - it('only closerPeers concurrent', (done) => { + it('only closerPeers concurrent', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -215,16 +194,13 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id]))((err, res) => { - expect(err).to.not.exist() + const res = await q.run([peerInfos[1].id, peerInfos[2].id, peerInfos[3].id]) - // Should visit all peers - expect(res.finalSet.size).to.eql(10) - done() - }) + // Should visit all peers + expect(res.finalSet.size).to.eql(10) }) - it('early success', (done) => { + it('early success', async () => { const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -256,127 +232,109 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() - - // Should complete successfully - expect(res.paths.length).to.eql(1) - expect(res.paths[0].success).to.eql(true) + const res = await q.run([peerInfos[1].id]) - // Should only visit peers up to the success peer - expect(res.finalSet.size).to.eql(2) + // Should complete successfully + expect(res.paths.length).to.eql(1) + expect(res.paths[0].success).to.eql(true) - done() - }) + // Should only visit peers up to the success peer + expect(res.finalSet.size).to.eql(2) }) - it('all queries stop after shutdown', (done) => { - createDHT(peerInfos, (err, dhtA) => { - if (err) { - return done(err) - } + it('all queries stop after shutdown', async () => { + const deferShutdown = pDefer() + const dhtA = await createDHT(peerInfos) + const peer = peerInfos[0] - const peer = peerInfos[0] - - // mock this so we can dial non existing peers - dhtA.switch.dial = (peer, callback) => callback() - - // 1 -> 2 -> 3 -> 4 - const topology = { - [peerInfos[1].id.toB58String()]: { - closer: [peerInfos[2]] - }, - [peerInfos[2].id.toB58String()]: { - closer: [peerInfos[3]] - }, - // Should not reach here because query gets shut down - [peerInfos[3].id.toB58String()]: { - closer: [peerInfos[4]] - } + // mock this so we can dial non existing peers + dhtA.switch.dial = (peer, callback) => callback() + + // 1 -> 2 -> 3 -> 4 + const topology = { + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]] + }, + // Should not reach here because query gets shut down + [peerInfos[3].id.toB58String()]: { + closer: [peerInfos[4]] } + } - const visited = [] - const queryFunc = async (p) => { - visited.push(p) - - const getResult = async () => { - const res = topology[p.toB58String()] || {} - // this timeout is necesary so `dhtA.stop` has time to stop the - // requests before they all complete - await new Promise(resolve => setTimeout(resolve, 100)) - return { - closerPeers: res.closer || [] - } - } + const visited = [] + const queryFunc = async (p) => { + visited.push(p) - // Shut down after visiting peerInfos[2] - if (p.toB58String() === peerInfos[2].id.toB58String()) { - await promisify(cb => dhtA.stop(cb)) - setTimeout(checkExpectations, 100) - return getResult() + const getResult = async () => { + const res = topology[p.toB58String()] || {} + // this timeout is necesary so `dhtA.stop` has time to stop the + // requests before they all complete + await new Promise(resolve => setTimeout(resolve, 100)) + return { + closerPeers: res.closer || [] } + } + + // Shut down after visiting peerInfos[2] + if (p.toB58String() === peerInfos[2].id.toB58String()) { + await dhtA.stop() + setTimeout(checkExpectations, 100) return getResult() } + return getResult() + } - const q = new Query(dhtA, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() - }) + const q = new Query(dhtA, peer.id.id, () => queryFunc) + await q.run([peerInfos[1].id]) - function checkExpectations () { - // Should only visit peers up to the point where we shut down - expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) + function checkExpectations () { + // Should only visit peers up to the point where we shut down + expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) - done() - } - }) - }) + deferShutdown.resolve() + } - it('queries run after shutdown return immediately', (done) => { - createDHT(peerInfos, (err, dhtA) => { - if (err) { - return done(err) - } + return deferShutdown.promise + }) - const peer = peerInfos[0] + it('queries run after shutdown return immediately', async () => { + const dhtA = await createDHT(peerInfos) + const peer = peerInfos[0] - // mock this so we can dial non existing peers - dhtA.switch.dial = (peer, callback) => callback() + // mock this so we can dial non existing peers + dhtA.switch.dial = (peer, callback) => callback() - // 1 -> 2 -> 3 - const topology = { - [peerInfos[1].id.toB58String()]: { - closer: [peerInfos[2]] - }, - [peerInfos[2].id.toB58String()]: { - closer: [peerInfos[3]] - } + // 1 -> 2 -> 3 + const topology = { + [peerInfos[1].id.toB58String()]: { + closer: [peerInfos[2]] + }, + [peerInfos[2].id.toB58String()]: { + closer: [peerInfos[3]] } + } - const queryFunc = async (p) => { // eslint-disable-line require-await - const res = topology[p.toB58String()] || {} - return { - closerPeers: res.closer || [] - } + const queryFunc = async (p) => { // eslint-disable-line require-await + const res = topology[p.toB58String()] || {} + return { + closerPeers: res.closer || [] } + } - const q = new Query(dhtA, peer.id.id, () => queryFunc) - - dhtA.stop(() => { - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() + const q = new Query(dhtA, peer.id.id, () => queryFunc) - // Should not visit any peers - expect(res.paths.length).to.eql(0) - expect(res.finalSet.size).to.eql(0) + await dhtA.stop() + const res = await q.run([peerInfos[1].id]) - done() - }) - }) - }) + // Should not visit any peers + expect(res.paths.length).to.eql(0) + expect(res.finalSet.size).to.eql(0) }) - it('disjoint path values', (done) => { + it('disjoint path values', async () => { const peer = peerInfos[0] const values = ['v0', 'v1'].map(Buffer.from) @@ -421,21 +379,17 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { - expect(err).to.not.exist() - - // We should get back the values from both paths - expect(res.paths.length).to.eql(2) - expect(res.paths[0].value).to.eql(values[0]) - expect(res.paths[0].success).to.eql(true) - expect(res.paths[1].value).to.eql(values[1]) - expect(res.paths[1].success).to.eql(true) - - done() - }) + const res = await q.run([peerInfos[1].id, peerInfos[4].id]) + + // We should get back the values from both paths + expect(res.paths.length).to.eql(2) + expect(res.paths[0].value).to.eql(values[0]) + expect(res.paths[0].success).to.eql(true) + expect(res.paths[1].value).to.eql(values[1]) + expect(res.paths[1].success).to.eql(true) }) - it('disjoint path values with early completion', (done) => { + it('disjoint path values with early completion', async () => { const peer = peerInfos[0] const values = ['v0', 'v1'].map(Buffer.from) @@ -479,7 +433,7 @@ describe('Query', () => { visited.push(p) const res = topology[p.toB58String()] || {} - await new Promise(resolve => setTimeout(resolve, res.delay)) + await delay(res.delay) return { closerPeers: res.closer || [], value: res.value, @@ -489,26 +443,22 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { - expect(err).to.not.exist() - - // We should only get back the value from the path 4 -> 5 - expect(res.paths.length).to.eql(1) - expect(res.paths[0].value).to.eql(values[1]) - expect(res.paths[0].success).to.eql(true) - - // Wait a little bit to make sure we don't continue down another path - // after finding a successful path - setTimeout(() => { - if (visited.indexOf(peerInfos[3].id) !== -1) { - expect.fail('Query continued after success was returned') - } - done() - }, 300) - }) + const res = await q.run([peerInfos[1].id, peerInfos[4].id]) + + // We should only get back the value from the path 4 -> 5 + expect(res.paths.length).to.eql(1) + expect(res.paths[0].value).to.eql(values[1]) + expect(res.paths[0].success).to.eql(true) + + // Wait a little bit to make sure we don't continue down another path + // after finding a successful path + await delay(300) + if (visited.indexOf(peerInfos[3].id) !== -1) { + expect.fail('Query continued after success was returned') + } }) - it('disjoint path continue other paths after error on one path', (done) => { + it('disjoint path continue other paths after error on one path', async () => { const peer = peerInfos[0] const values = ['v0', 'v1'].map(Buffer.from) @@ -567,114 +517,97 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id, peerInfos[4].id]))((err, res) => { - expect(err).to.not.exist() - - // We should only get back the value from the path 1 -> 2 -> 3 - expect(res.paths.length).to.eql(1) - expect(res.paths[0].value).to.eql(values[0]) - expect(res.paths[0].success).to.eql(true) + const res = await q.run([peerInfos[1].id, peerInfos[4].id]) - done() - }) + // We should only get back the value from the path 1 -> 2 -> 3 + expect(res.paths.length).to.eql(1) + expect(res.paths[0].value).to.eql(values[0]) + expect(res.paths[0].success).to.eql(true) }) - it('stop after finding k closest peers', (done) => { + it('stop after finding k closest peers', async () => { // mock this so we can dial non existing peers dht.switch.dial = (peer, callback) => callback() // Sort peers by distance from peerInfos[0] - kadUtils.convertPeerId(peerInfos[0].id, (err, peerZeroDhtKey) => { - if (err) { - return done(err) - } + const peerZeroDhtKey = await kadUtils.convertPeerId(peerInfos[0].id) + const peerIds = peerInfos.map(pi => pi.id) + const sorted = await kadUtils.sortClosestPeers(peerIds, peerZeroDhtKey) - const peerIds = peerInfos.map(pi => pi.id) - kadUtils.sortClosestPeers(peerIds, peerZeroDhtKey, (err, sorted) => { - if (err) { - return done(err) - } + // Local node has nodes 10, 16 and 18 in k-bucket + const initial = [sorted[10], sorted[16], sorted[18]] - // Local node has nodes 10, 16 and 18 in k-bucket - const initial = [sorted[10], sorted[16], sorted[18]] - - // Should zoom in to peers near target, and then zoom out again until it - // has successfully queried 20 peers - const topology = { - // Local node has nodes 10, 16 and 18 in k-bucket - 10: [12, 20, 22, 24, 26, 28], - 16: [14, 18, 20, 22, 24, 26], - 18: [4, 6, 8, 12, 14, 16], - - 26: [24, 28, 30, 38], - 30: [14, 28], - 38: [2], - - // Should zoom out from this point, until it has 20 peers - 2: [13], - 13: [15], - 15: [17], - - // Right before we get to 20 peers, it finds some new peers that are - // closer than some of the ones it has already queried - 17: [1, 3, 5, 11], - 1: [7, 9], - 9: [19], - - // At this point it's visited 20 (actually more than 20 peers), and - // there are no closer peers to be found, so it should stop querying. - // Because there are 3 paths, each with a worker queue with - // concurrency 3, the exact order in which peers are visited is - // unpredictable, so we add a long tail and below we test to make - // sure that it never reaches the end of the tail. - 19: [21], - 21: [23], - 23: [25], - 25: [27], - 27: [29], - 29: [31] - } + // Should zoom in to peers near target, and then zoom out again until it + // has successfully queried 20 peers + const topology = { + // Local node has nodes 10, 16 and 18 in k-bucket + 10: [12, 20, 22, 24, 26, 28], + 16: [14, 18, 20, 22, 24, 26], + 18: [4, 6, 8, 12, 14, 16], + + 26: [24, 28, 30, 38], + 30: [14, 28], + 38: [2], + + // Should zoom out from this point, until it has 20 peers + 2: [13], + 13: [15], + 15: [17], + + // Right before we get to 20 peers, it finds some new peers that are + // closer than some of the ones it has already queried + 17: [1, 3, 5, 11], + 1: [7, 9], + 9: [19], + + // At this point it's visited 20 (actually more than 20 peers), and + // there are no closer peers to be found, so it should stop querying. + // Because there are 3 paths, each with a worker queue with + // concurrency 3, the exact order in which peers are visited is + // unpredictable, so we add a long tail and below we test to make + // sure that it never reaches the end of the tail. + 19: [21], + 21: [23], + 23: [25], + 25: [27], + 27: [29], + 29: [31] + } - const peerIndex = (peerId) => sorted.findIndex(p => p === peerId) - const peerIdToInfo = (peerId) => peerInfos.find(pi => pi.id === peerId) + const peerIndex = (peerId) => sorted.findIndex(p => p === peerId) + const peerIdToInfo = (peerId) => peerInfos.find(pi => pi.id === peerId) - const visited = [] - const queryFunc = async (peerId) => { // eslint-disable-line require-await - visited.push(peerId) - const i = peerIndex(peerId) - const closerIndexes = topology[i] || [] - const closerPeers = closerIndexes.map(j => peerIdToInfo(sorted[j])) - return { closerPeers } - } + const visited = [] + const queryFunc = async (peerId) => { // eslint-disable-line require-await + visited.push(peerId) + const i = peerIndex(peerId) + const closerIndexes = topology[i] || [] + const closerPeers = closerIndexes.map(j => peerIdToInfo(sorted[j])) + return { closerPeers } + } - const q = new Query(dht, peerInfos[0].id.id, () => queryFunc) - promiseToCallback(q.run(initial))((err, res) => { - expect(err).to.not.exist() - - // Should query 19 peers, then find some peers closer to the key, and - // finally stop once those closer peers have been queried - const expectedVisited = new Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 20, 22, 24, 26, 28, 30, 38]) - const visitedSet = new Set(visited.map(peerIndex)) - for (const i of expectedVisited) { - expect(visitedSet.has(i)) - } - - // Should never get to end of tail (see note above) - expect(visited.find(p => peerIndex(p) === 29)).not.to.exist() - - // Final set should have 20 peers, and the closer peers that were - // found near the end of the query should displace further away - // peers that were found at the beginning - expect(res.finalSet.size).to.eql(20) - expect(res.finalSet.has(sorted[1])).to.eql(true) - expect(res.finalSet.has(sorted[3])).to.eql(true) - expect(res.finalSet.has(sorted[5])).to.eql(true) - expect(res.finalSet.has(sorted[38])).to.eql(false) - - done() - }) - }) - }) + const q = new Query(dht, peerInfos[0].id.id, () => queryFunc) + const res = await q.run(initial) + + // Should query 19 peers, then find some peers closer to the key, and + // finally stop once those closer peers have been queried + const expectedVisited = new Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 20, 22, 24, 26, 28, 30, 38]) + const visitedSet = new Set(visited.map(peerIndex)) + for (const i of expectedVisited) { + expect(visitedSet.has(i)) + } + + // Should never get to end of tail (see note above) + expect(visited.find(p => peerIndex(p) === 29)).not.to.exist() + + // Final set should have 20 peers, and the closer peers that were + // found near the end of the query should displace further away + // peers that were found at the beginning + expect(res.finalSet.size).to.eql(20) + expect(res.finalSet.has(sorted[1])).to.eql(true) + expect(res.finalSet.has(sorted[3])).to.eql(true) + expect(res.finalSet.has(sorted[5])).to.eql(true) + expect(res.finalSet.has(sorted[38])).to.eql(false) }) /* @@ -695,47 +628,46 @@ describe('Query', () => { * ... * */ - it('uses disjoint paths', (done) => { + it('uses disjoint paths', async () => { const goodLength = 3 const samplePeerInfos = peerInfos.slice(0, 12) - createDisjointTracks(samplePeerInfos, goodLength, (err, targetId, starts, getResponse) => { - expect(err).to.not.exist() - // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() - let badEndVisited = false - let targetVisited = false - - const q = new Query(dht, targetId, (trackNum) => { - return async (p) => { // eslint-disable-line require-await - const response = getResponse(p, trackNum) - expect(response).to.exist() // or we aren't on the right track - if (response.end && !response.pathComplete) { - badEndVisited = true - } - if (response.pathComplete) { - targetVisited = true - expect(badEndVisited).to.eql(false) - } - return response + const { + targetId, + starts, + getResponse + } = await createDisjointTracks(samplePeerInfos, goodLength) + + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + let badEndVisited = false + let targetVisited = false + + const q = new Query(dht, targetId, (trackNum) => { + return async (p) => { // eslint-disable-line require-await + const response = getResponse(p, trackNum) + expect(response).to.exist() // or we aren't on the right track + if (response.end && !response.pathComplete) { + badEndVisited = true + } + if (response.pathComplete) { + targetVisited = true + expect(badEndVisited).to.eql(false) } - }) - q.concurrency = 1 - // due to round-robin allocation of peers from starts, first - // path is good, second bad - promiseToCallback(q.run(starts))((err, res) => { - expect(err).to.not.exist() - // we should reach the target node - expect(targetVisited).to.eql(true) - // we should visit all nodes (except the target) - expect(res.finalSet.size).to.eql(samplePeerInfos.length - 1) - // there should be one successful path - expect(res.paths.length).to.eql(1) - done() - }) + return response + } }) + q.concurrency = 1 + const res = await q.run(starts) + // we should reach the target node + expect(targetVisited).to.eql(true) + // we should visit all nodes (except the target) + expect(res.finalSet.size).to.eql(samplePeerInfos.length - 1) + // there should be one successful path + expect(res.paths.length).to.eql(1) }) - it('should discover closer peers', (done) => { + it('should discover closer peers', () => { + const discoverDefer = pDefer() const peer = peerInfos[0] // mock this so we can dial non existing peers @@ -748,13 +680,13 @@ describe('Query', () => { } const q = new Query(dht, peer.id.id, () => queryFunc) - promiseToCallback(q.run([peerInfos[1].id]))((err, res) => { - expect(err).to.not.exist() - }) + q.run([peerInfos[1].id]) dht.once('peer', (peerInfo) => { expect(peerInfo.id).to.eql(peerInfos[2].id) - done() + discoverDefer.resolve() }) + + return discoverDefer.promise }) }) diff --git a/test/query/index.spec.js b/test/query/index.spec.js index f3fd3c3e..7865e4e1 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -7,9 +7,8 @@ chai.use(require('dirty-chai')) chai.use(require('chai-checkmark')) const expect = chai.expect const sinon = require('sinon') -const each = require('async/each') +const delay = require('delay') const PeerBook = require('peer-book') -const promiseToCallback = require('promise-to-callback') const Query = require('../../src/query') const Path = require('../../src/query/path') @@ -24,12 +23,11 @@ const NUM_IDS = 101 describe('Query', () => { let peerInfos let ourPeerInfo - before((done) => { - createPeerInfo(NUM_IDS, (err, peers) => { - ourPeerInfo = peers.shift() - peerInfos = peers - done(err) - }) + before(async () => { + const peers = await createPeerInfo(NUM_IDS) + + ourPeerInfo = peers.shift() + peerInfos = peers }) describe('get closest peers', () => { @@ -40,16 +38,11 @@ describe('Query', () => { let sortedPeers let dht - before('get sorted peers', (done) => { - convertBuffer(targetKey.key, (err, dhtKey) => { - if (err) return done(err) - targetKey.dhtKey = dhtKey + before('get sorted peers', async () => { + const dhtKey = await convertBuffer(targetKey.key) + targetKey.dhtKey = dhtKey - sortClosestPeerInfos(peerInfos, targetKey.dhtKey, (err, peers) => { - sortedPeers = peers - done(err) - }) - }) + sortedPeers = await sortClosestPeerInfos(peerInfos, targetKey.dhtKey) }) before('create a dht', () => { @@ -63,7 +56,7 @@ describe('Query', () => { sinon.restore() }) - it('should end paths when they have no closer peers to whats already been queried', (done) => { + it('should end paths when they have no closer peers to whats already been queried', async () => { const PATHS = 5 sinon.stub(dht, 'disjointPaths').value(PATHS) sinon.stub(dht._queryManager, 'running').value(true) @@ -72,53 +65,46 @@ describe('Query', () => { const query = new Query(dht, targetKey.key, () => querySpy) const run = new Run(query) - promiseToCallback(run.init())(() => { - // Add the sorted peers into 5 paths. This will weight - // the paths with increasingly further peers - const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) - const peersPerPath = sortedPeerIds.length / PATHS - const paths = [...new Array(PATHS)].map((_, index) => { - const path = new Path(run, query.makePath()) - const start = index * peersPerPath - const peers = sortedPeerIds.slice(start, start + peersPerPath) - peers.forEach(p => path.addInitialPeer(p)) - return path - }) - - // Get the peers of the 2nd closest path, and remove the path - // We don't want to execute it. Just add its peers to peers we've - // already queried. - const queriedPeers = paths.splice(1, 1)[0].initialPeers - each(queriedPeers, (peerId, cb) => { - run.peersQueried.add(peerId, cb) - }, (err) => { - if (err) return done(err) - - const continueSpy = sinon.spy(run, 'continueQuerying') - - // Run the 4 paths - promiseToCallback(run.executePaths(paths))((err) => { - expect(err).to.not.exist() - // The resulting peers should all be from path 0 as it had the closest - expect(run.peersQueried.peers).to.eql(paths[0].initialPeers) - - // Continue should be called on all `peersPerPath` queries of the first path, - // plus ALPHA (concurrency) of the other 3 paths - expect(continueSpy.callCount).to.eql(peersPerPath + (3 * c.ALPHA)) - - // The query should ONLY have been called on path 0 as it - // was the only path to contain closer peers that what we - // pre populated `run.peersQueried` with - expect(querySpy.callCount).to.eql(peersPerPath) - const queriedPeers = querySpy.getCalls().map(call => call.args[0]) - expect(queriedPeers).to.eql(paths[0].initialPeers) - done() - }) - }) + await run.init() + + // Add the sorted peers into 5 paths. This will weight + // the paths with increasingly further peers + const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) + const peersPerPath = sortedPeerIds.length / PATHS + const paths = [...new Array(PATHS)].map((_, index) => { + const path = new Path(run, query.makePath()) + const start = index * peersPerPath + const peers = sortedPeerIds.slice(start, start + peersPerPath) + peers.forEach(p => path.addInitialPeer(p)) + return path }) + + // Get the peers of the 2nd closest path, and remove the path + // We don't want to execute it. Just add its peers to peers we've + // already queried. + const queriedPeers = paths.splice(1, 1)[0].initialPeers + await Promise.all(queriedPeers.map((peerId) => run.peersQueried.add(peerId))) + + const continueSpy = sinon.spy(run, 'continueQuerying') + + await run.executePaths(paths) + + // The resulting peers should all be from path 0 as it had the closest + expect(run.peersQueried.peers).to.eql(paths[0].initialPeers) + + // Continue should be called on all `peersPerPath` queries of the first path, + // plus ALPHA (concurrency) of the other 3 paths + expect(continueSpy.callCount).to.eql(peersPerPath + (3 * c.ALPHA)) + + // The query should ONLY have been called on path 0 as it + // was the only path to contain closer peers that what we + // pre populated `run.peersQueried` with + expect(querySpy.callCount).to.eql(peersPerPath) + const finalQueriedPeers = querySpy.getCalls().map(call => call.args[0]) + expect(finalQueriedPeers).to.eql(paths[0].initialPeers) }) - it('should continue querying if the path has a closer peer', (done) => { + it('should continue querying if the path has a closer peer', async () => { sinon.stub(dht, 'disjointPaths').value(1) sinon.stub(dht._queryManager, 'running').value(true) @@ -126,57 +112,50 @@ describe('Query', () => { const query = new Query(dht, targetKey.key, () => querySpy) const run = new Run(query) - promiseToCallback(run.init())(() => { - const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) - // Take the top 15 peers and peers 20 - 25 to seed `run.peersQueried` - // This leaves us with only 16 - 19 as closer peers - const queriedPeers = [ - ...sortedPeerIds.slice(0, 15), - ...sortedPeerIds.slice(20, 25) - ] - - const path = new Path(run, query.makePath()) - // Give the path a closet peer and 15 further peers - const pathPeers = [ - ...sortedPeerIds.slice(15, 16), // 1 closer - ...sortedPeerIds.slice(80, 95) - ] - - pathPeers.forEach(p => path.addInitialPeer(p)) - const returnPeers = sortedPeers.slice(16, 20) - // When the second query happens, which is a further peer, - // return peers 16 - 19 - querySpy.onCall(1).callsFake(async () => { - // this timeout ensures the queries finish in serial - // see https://github.com/libp2p/js-libp2p-kad-dht/pull/121#discussion_r286437978 - await new Promise(resolve => setTimeout(resolve, 10)) - return { closerPeers: returnPeers } - }) - - each(queriedPeers, (peerId, cb) => { - run.peersQueried.add(peerId, cb) - }, (err) => { - if (err) return done(err) - - // Run the path - promiseToCallback(run.executePaths([path]))((err) => { - expect(err).to.not.exist() - - // Querying will stop after the first ALPHA peers are queried - expect(querySpy.callCount).to.eql(c.ALPHA) - - // We'll only get the 1 closest peer from `pathPeers`. - // The worker will be stopped before the `returnedPeers` - // are processed and queried. - expect(run.peersQueried.peers).to.eql([ - ...sortedPeerIds.slice(0, 16), - ...sortedPeerIds.slice(20, 24) - ]) - done() - }) - }) + await run.init() + + const sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) + + // Take the top 15 peers and peers 20 - 25 to seed `run.peersQueried` + // This leaves us with only 16 - 19 as closer peers + const queriedPeers = [ + ...sortedPeerIds.slice(0, 15), + ...sortedPeerIds.slice(20, 25) + ] + + const path = new Path(run, query.makePath()) + // Give the path a closet peer and 15 further peers + const pathPeers = [ + ...sortedPeerIds.slice(15, 16), // 1 closer + ...sortedPeerIds.slice(80, 95) + ] + + pathPeers.forEach(p => path.addInitialPeer(p)) + const returnPeers = sortedPeers.slice(16, 20) + // When the second query happens, which is a further peer, + // return peers 16 - 19 + querySpy.onCall(1).callsFake(async () => { + // this delay ensures the queries finish in serial + // see https://github.com/libp2p/js-libp2p-kad-dht/pull/121#discussion_r286437978 + await delay(10) + return { closerPeers: returnPeers } }) + + await Promise.all(queriedPeers.map((peerId) => run.peersQueried.add(peerId))) + + await run.executePaths([path]) + + // Querying will stop after the first ALPHA peers are queried + expect(querySpy.callCount).to.eql(c.ALPHA) + + // We'll only get the 1 closest peer from `pathPeers`. + // The worker will be stopped before the `returnedPeers` + // are processed and queried. + expect(run.peersQueried.peers).to.eql([ + ...sortedPeerIds.slice(0, 16), + ...sortedPeerIds.slice(20, 24) + ]) }) }) }) diff --git a/test/random-walk.spec.js b/test/random-walk.spec.js index 57a62da6..f0ebea58 100644 --- a/test/random-walk.spec.js +++ b/test/random-walk.spec.js @@ -17,7 +17,10 @@ describe('Random Walk', () => { toB58String: () => 'QmRLoXS3E73psYaUsma1VSbboTa2J8Z9kso1tpiGLk9WQ4' } }, - findPeer: () => {} + findPeer: () => {}, + _log: { + error: () => {} + } } afterEach(() => { @@ -65,8 +68,7 @@ describe('Random Walk', () => { const queries = 5 const error = new Error('ERR_BOOM') const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer') - findPeerStub.onCall(2).callsArgWith(2, error) - findPeerStub.callsArgWith(2, { code: 'ERR_NOT_FOUND' }) + findPeerStub.throws(error) let err try { @@ -89,7 +91,9 @@ describe('Random Walk', () => { }) it('should pass its timeout to the find peer query', async () => { - sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' }) + const error = new Error() + error.code = 'ERR_NOT_FOUND' + sinon.stub(randomWalk._kadDHT, 'findPeer').throws(error) await randomWalk._walk(1, 111) const mockCalls = randomWalk._kadDHT.findPeer.getCalls() @@ -192,9 +196,10 @@ describe('Random Walk', () => { } const error = { code: 'ERR_NOT_FOUND' } const randomWalk = new RandomWalk(mockDHT, options) - sinon.stub(randomWalk._kadDHT, 'findPeer').callsFake((_, opts, callback) => { + sinon.stub(randomWalk._kadDHT, 'findPeer').callsFake((_, opts) => { expect(opts.timeout).to.eql(options.timeout).mark() - setTimeout(() => callback(error), 100) + + throw error }) expect(3).checks(() => { diff --git a/test/routing.spec.js b/test/routing.spec.js index 1da7f622..c1227654 100644 --- a/test/routing.spec.js +++ b/test/routing.spec.js @@ -5,119 +5,74 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const PeerId = require('peer-id') -const map = require('async/map') -const each = require('async/each') -const series = require('async/series') -const range = require('lodash.range') const random = require('lodash.random') const RoutingTable = require('../src/routing') const kadUtils = require('../src/utils') - -function createPeerId (n, callback) { - map(range(n), (i, cb) => PeerId.create({ bits: 512 }, cb), callback) -} +const createPeerId = require('./utils/create-peer-id') describe('Routing Table', () => { let table - beforeEach(function (done) { + beforeEach(async function () { this.timeout(20 * 1000) - PeerId.create({ bits: 512 }, (err, id) => { - expect(err).to.not.exist() - table = new RoutingTable(id, 20) - done() - }) + const id = await PeerId.create({ bits: 512 }) + table = new RoutingTable(id, 20) }) - it('add', function (done) { + it('add', async function () { this.timeout(20 * 1000) - createPeerId(20, (err, ids) => { - expect(err).to.not.exist() - - series([ - (cb) => each(range(1000), (n, cb) => { - table.add(ids[random(ids.length - 1)], cb) - }, cb), - (cb) => each(range(20), (n, cb) => { - const id = ids[random(ids.length - 1)] - - kadUtils.convertPeerId(id, (err, key) => { - expect(err).to.not.exist() - expect(table.closestPeers(key, 5).length) - .to.be.above(0) - cb() - }) - }, cb) - ], done) - }) + const ids = await createPeerId(20) + + await Promise.all( + Array.from({ length: 1000 }).map(() => table.add(ids[random(ids.length - 1)])) + ) + + await Promise.all( + Array.from({ length: 20 }).map(async () => { + const id = ids[random(ids.length - 1)] + const key = await kadUtils.convertPeerId(id) + + expect(table.closestPeers(key, 5).length) + .to.be.above(0) + }) + ) }) - it('remove', function (done) { + it('remove', async function () { this.timeout(20 * 1000) - createPeerId(10, (err, peers) => { - expect(err).to.not.exist() - - let k - series([ - (cb) => each(peers, (peer, cbEach) => table.add(peer, cbEach), cb), - (cb) => { - const id = peers[2] - kadUtils.convertPeerId(id, (err, key) => { - expect(err).to.not.exist() - k = key - expect(table.closestPeers(key, 10)).to.have.length(10) - cb() - }) - }, - (cb) => table.remove(peers[5], cb), - (cb) => { - expect(table.closestPeers(k, 10)).to.have.length(9) - expect(table.size).to.be.eql(9) - cb() - } - ], done) - }) + const peers = await createPeerId(10) + await Promise.all(peers.map((peer) => table.add(peer))) + + const key = await kadUtils.convertPeerId(peers[2]) + expect(table.closestPeers(key, 10)).to.have.length(10) + + await table.remove(peers[5]) + expect(table.closestPeers(key, 10)).to.have.length(9) + expect(table.size).to.be.eql(9) }) - it('closestPeer', function (done) { + it('closestPeer', async function () { this.timeout(10 * 1000) - createPeerId(4, (err, peers) => { - expect(err).to.not.exist() - series([ - (cb) => each(peers, (peer, cb) => table.add(peer, cb), cb), - (cb) => { - const id = peers[2] - kadUtils.convertPeerId(id, (err, key) => { - expect(err).to.not.exist() - expect(table.closestPeer(key)).to.eql(id) - cb() - }) - } - ], done) - }) + const peers = await createPeerId(4) + await Promise.all(peers.map((peer) => table.add(peer))) + + const id = peers[2] + const key = await kadUtils.convertPeerId(id) + expect(table.closestPeer(key)).to.eql(id) }) - it('closestPeers', function (done) { + it('closestPeers', async function () { this.timeout(20 * 1000) - createPeerId(18, (err, peers) => { - expect(err).to.not.exist() - series([ - (cb) => each(peers, (peer, cb) => table.add(peer, cb), cb), - (cb) => { - const id = peers[2] - kadUtils.convertPeerId(id, (err, key) => { - expect(err).to.not.exist() - expect(table.closestPeers(key, 15)).to.have.length(15) - cb() - }) - } - ], done) - }) + const peers = await createPeerId(18) + await Promise.all(peers.map((peer) => table.add(peer))) + + const key = await kadUtils.convertPeerId(peers[2]) + expect(table.closestPeers(key, 15)).to.have.length(15) }) }) diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js index 1219265f..4070dbd1 100644 --- a/test/rpc/handlers/add-provider.spec.js +++ b/test/rpc/handlers/add-provider.spec.js @@ -5,10 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const parallel = require('async/parallel') -const waterfall = require('async/waterfall') const _ = require('lodash') -const promiseToCallback = require('promise-to-callback') const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/add-provider') @@ -23,33 +20,23 @@ describe('rpc - handlers - AddProvider', () => { let tdht let dht - before((done) => { - parallel([ - (cb) => createPeerInfo(3, cb), - (cb) => createValues(2, cb) - ], (err, res) => { - expect(err).to.not.exist() - peers = res[0] - values = res[1] - done() - }) + before(async () => { + [peers, values] = await Promise.all([ + createPeerInfo(3), + createValues(2) + ]) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - describe('invalid messages', () => { + describe('invalid messages', async () => { const tests = [{ message: new Message(Message.TYPES.ADD_PROVIDER, Buffer.alloc(0), 0), error: 'ERR_MISSING_KEY' @@ -58,22 +45,28 @@ describe('rpc - handlers - AddProvider', () => { error: 'ERR_INVALID_CID' }] - tests.forEach((t) => it(t.error.toString(), (done) => { - handler(dht)(peers[0], t.message, (err) => { - expect(err).to.exist() - expect(err.code).to.eql(t.error) - done() + await Promise.all(tests.map((t) => { + it(t.error.toString(), async () => { + try { + await handler(dht)(peers[0], t.message) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql(t.error) + return + } + throw new Error() }) })) }) - it('ignore providers that do not match the sender', (done) => { + it('ignore providers that do not match the sender', async () => { const cid = values[0].cid const msg = new Message(Message.TYPES.ADD_PROVIDER, cid.buffer, 0) const sender = _.cloneDeep(peers[0]) const provider = _.cloneDeep(peers[0]) provider.multiaddrs.add('/ip4/127.0.0.1/tcp/1234') + const other = _.cloneDeep(peers[1]) other.multiaddrs.add('/ip4/127.0.0.1/tcp/2345') msg.providerPeers = [ @@ -81,23 +74,21 @@ describe('rpc - handlers - AddProvider', () => { other ] - waterfall([ - (cb) => handler(dht)(sender, msg, cb), - (cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), - (provs, cb) => { - expect(provs).to.have.length(1) - expect(provs[0].id).to.eql(provider.id.id) - const bookEntry = dht.peerBook.get(provider.id) - // Favour peerInfo from payload over peerInfo from sender - expect(bookEntry.multiaddrs.toArray()).to.eql( - provider.multiaddrs.toArray() - ) - cb() - } - ], done) + await handler(dht)(sender, msg) + + const provs = await dht.providers.getProviders(cid) + + expect(provs).to.have.length(1) + expect(provs[0].id).to.eql(provider.id.id) + const bookEntry = dht.peerBook.get(provider.id) + + // Favour peerInfo from payload over peerInfo from sender + expect(bookEntry.multiaddrs.toArray()).to.eql( + provider.multiaddrs.toArray() + ) }) - it('fall back to sender if providers have no multiaddrs', (done) => { + it('fall back to sender if providers have no multiaddrs', async () => { const cid = values[0].cid const msg = new Message(Message.TYPES.ADD_PROVIDER, cid.buffer, 0) const sender = _.cloneDeep(peers[0]) @@ -105,15 +96,12 @@ describe('rpc - handlers - AddProvider', () => { provider.multiaddrs.clear() msg.providerPeers = [provider] - waterfall([ - (cb) => handler(dht)(sender, msg, cb), - (cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), - (provs, cb) => { - expect(dht.peerBook.has(provider.id)).to.equal(false) - expect(provs).to.have.length(1) - expect(provs[0].id).to.eql(provider.id.id) - cb() - } - ], done) + await handler(dht)(sender, msg) + + const provs = await dht.providers.getProviders(cid) + + expect(dht.peerBook.has(provider.id)).to.equal(false) + expect(provs).to.have.length(1) + expect(provs[0].id).to.eql(provider.id.id) }) }) diff --git a/test/rpc/handlers/find-node.spec.js b/test/rpc/handlers/find-node.spec.js index 985a794b..99b52c61 100644 --- a/test/rpc/handlers/find-node.spec.js +++ b/test/rpc/handlers/find-node.spec.js @@ -4,7 +4,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const waterfall = require('async/waterfall') const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/find-node') @@ -12,7 +11,6 @@ const handler = require('../../../src/rpc/handlers/find-node') const T = Message.TYPES.FIND_NODE const createPeerInfo = require('../../utils/create-peer-info') -// const createValues = require('../../utils/create-values') const TestDHT = require('../../utils/test-dht') describe('rpc - handlers - FindNode', () => { @@ -20,71 +18,52 @@ describe('rpc - handlers - FindNode', () => { let tdht let dht - before((done) => { - createPeerInfo(3, (err, res) => { - expect(err).to.not.exist() - peers = res - done() - }) + before(async () => { + peers = await createPeerInfo(3) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - it('returns self, if asked for self', (done) => { + it('returns self, if asked for self', async () => { const msg = new Message(T, dht.peerInfo.id.id, 0) - handler(dht)(peers[1], msg, (err, response) => { - expect(err).to.not.exist() - expect(response.closerPeers).to.have.length(1) - const peer = response.closerPeers[0] + const response = await handler(dht)(peers[1], msg) + + expect(response.closerPeers).to.have.length(1) + const peer = response.closerPeers[0] - expect(peer.id.id).to.be.eql(dht.peerInfo.id.id) - done() - }) + expect(peer.id.id).to.be.eql(dht.peerInfo.id.id) }) - it('returns closer peers', (done) => { + it('returns closer peers', async () => { const msg = new Message(T, Buffer.from('hello'), 0) const other = peers[1] - waterfall([ - (cb) => dht._add(other, cb), - (cb) => handler(dht)(peers[2], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.closerPeers).to.have.length(1) - const peer = response.closerPeers[0] - - expect(peer.id.id).to.be.eql(peers[1].id.id) - expect( - peer.multiaddrs.toArray() - ).to.be.eql( - peers[1].multiaddrs.toArray() - ) - - done() - }) + await dht._add(other) + const response = await handler(dht)(peers[2], msg) + + expect(response.closerPeers).to.have.length(1) + const peer = response.closerPeers[0] + + expect(peer.id.id).to.be.eql(peers[1].id.id) + expect( + peer.multiaddrs.toArray() + ).to.be.eql( + peers[1].multiaddrs.toArray() + ) }) - it('handles no peers found', (done) => { + it('handles no peers found', async () => { const msg = new Message(T, Buffer.from('hello'), 0) + const response = await handler(dht)(peers[2], msg) - handler(dht)(peers[2], msg, (err, response) => { - expect(err).to.not.exist() - expect(response.closerPeers).to.have.length(0) - done() - }) + expect(response.closerPeers).to.have.length(0) }) }) diff --git a/test/rpc/handlers/get-providers.spec.js b/test/rpc/handlers/get-providers.spec.js index e2f5442d..5684989a 100644 --- a/test/rpc/handlers/get-providers.spec.js +++ b/test/rpc/handlers/get-providers.spec.js @@ -4,9 +4,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const parallel = require('async/parallel') -const waterfall = require('async/waterfall') -const promiseToCallback = require('promise-to-callback') const Message = require('../../../src/message') const utils = require('../../../src/utils') @@ -24,86 +21,65 @@ describe('rpc - handlers - GetProviders', () => { let tdht let dht - before((done) => { - parallel([ - (cb) => createPeerInfo(3, cb), - (cb) => createValues(2, cb) - ], (err, res) => { - expect(err).to.not.exist() - peers = res[0] - values = res[1] - done() - }) + before(async () => { + [peers, values] = await Promise.all([ + createPeerInfo(3), + createValues(2) + ]) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - it('errors with an invalid key ', (done) => { + it('errors with an invalid key ', async () => { const msg = new Message(T, Buffer.from('hello'), 0) - handler(dht)(peers[0], msg, (err, response) => { + try { + await handler(dht)(peers[0], msg) + } catch (err) { expect(err.code).to.eql('ERR_INVALID_CID') - expect(response).to.not.exist() - done() - }) + } }) - it('responds with self if the value is in the datastore', (done) => { + it('responds with self if the value is in the datastore', async () => { const v = values[0] const msg = new Message(T, v.cid.buffer, 0) const dsKey = utils.bufferToKey(v.cid.buffer) - waterfall([ - (cb) => promiseToCallback(dht.datastore.put(dsKey, v.value))(cb), - (_, cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - - expect(response.key).to.be.eql(v.cid.buffer) - expect(response.providerPeers).to.have.length(1) - expect(response.providerPeers[0].id.toB58String()) - .to.eql(dht.peerInfo.id.toB58String()) + await dht.datastore.put(dsKey, v.value) + const response = await handler(dht)(peers[0], msg) - done() - }) + expect(response.key).to.be.eql(v.cid.buffer) + expect(response.providerPeers).to.have.length(1) + expect(response.providerPeers[0].id.toB58String()) + .to.eql(dht.peerInfo.id.toB58String()) }) - it('responds with listed providers and closer peers', (done) => { + it('responds with listed providers and closer peers', async () => { const v = values[0] const msg = new Message(T, v.cid.buffer, 0) const prov = peers[1].id const closer = peers[2] - waterfall([ - (cb) => dht._add(closer, cb), - (cb) => promiseToCallback(dht.providers.addProvider(v.cid, prov))(err => cb(err)), - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - - expect(response.key).to.be.eql(v.cid.buffer) - expect(response.providerPeers).to.have.length(1) - expect(response.providerPeers[0].id.toB58String()) - .to.eql(prov.toB58String()) - - expect(response.closerPeers).to.have.length(1) - expect(response.closerPeers[0].id.toB58String()) - .to.eql(closer.id.toB58String()) - done() - }) + await dht._add(closer) + await dht.providers.addProvider(v.cid, prov) + const response = await handler(dht)(peers[0], msg) + + expect(response.key).to.be.eql(v.cid.buffer) + expect(response.providerPeers).to.have.length(1) + expect(response.providerPeers[0].id.toB58String()) + .to.eql(prov.toB58String()) + + expect(response.closerPeers).to.have.length(1) + expect(response.closerPeers[0].id.toB58String()) + .to.eql(closer.id.toB58String()) }) }) diff --git a/test/rpc/handlers/get-value.spec.js b/test/rpc/handlers/get-value.spec.js index 660cfcdf..b0ad3cd4 100644 --- a/test/rpc/handlers/get-value.spec.js +++ b/test/rpc/handlers/get-value.spec.js @@ -4,7 +4,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const waterfall = require('async/waterfall') + const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/get-value') const utils = require('../../../src/utils') @@ -12,7 +12,6 @@ const utils = require('../../../src/utils') const T = Message.TYPES.GET_VALUE const createPeerInfo = require('../../utils/create-peer-info') -// const createValues = require('../../utils/create-values') const TestDHT = require('../../utils/test-dht') describe('rpc - handlers - GetValue', () => { @@ -20,123 +19,87 @@ describe('rpc - handlers - GetValue', () => { let tdht let dht - before((done) => { - createPeerInfo(2, (err, res) => { - expect(err).to.not.exist() - peers = res - done() - }) + before(async () => { + peers = await createPeerInfo(2) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - it('errors when missing key', (done) => { + it('errors when missing key', async () => { const msg = new Message(T, Buffer.alloc(0), 0) - handler(dht)(peers[0], msg, (err, response) => { + try { + await handler(dht)(peers[0], msg) + } catch (err) { expect(err.code).to.eql('ERR_INVALID_KEY') - expect(response).to.not.exist() - done() - }) + return + } + + throw new Error('should error when missing key') }) - it('responds with a local value', (done) => { + it('responds with a local value', async () => { const key = Buffer.from('hello') const value = Buffer.from('world') const msg = new Message(T, key, 0) - waterfall([ - (cb) => dht.put(key, value, cb), - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.record).to.exist() - expect(response.record.key).to.eql(key) - expect(response.record.value).to.eql(value) - done() - }) + + await dht.put(key, value) + const response = await handler(dht)(peers[0], msg) + + expect(response.record).to.exist() + expect(response.record.key).to.eql(key) + expect(response.record.value).to.eql(value) }) - it('responds with closerPeers returned from the dht', (done) => { + it('responds with closerPeers returned from the dht', async () => { const key = Buffer.from('hello') const msg = new Message(T, key, 0) const other = peers[1] - waterfall([ - (cb) => dht._add(other, cb), - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.closerPeers).to.have.length(1) - expect( - response.closerPeers[0].id.toB58String() - ).to.be.eql(other.id.toB58String()) - done() - }) + await dht._add(other) + const response = await handler(dht)(peers[0], msg) + + expect(response.closerPeers).to.have.length(1) + expect(response.closerPeers[0].id.toB58String()).to.be.eql(other.id.toB58String()) }) describe('public key', () => { - it('self', (done) => { + it('self', async () => { const key = utils.keyForPublicKey(dht.peerInfo.id) const msg = new Message(T, key, 0) + const response = await handler(dht)(peers[0], msg) - waterfall([ - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.record).to.exist() - expect(response.record.value).to.eql( - dht.peerInfo.id.pubKey.bytes - ) - done() - }) + expect(response.record).to.exist() + expect(response.record.value).to.eql(dht.peerInfo.id.pubKey.bytes) }) - it('other in peerstore', (done) => { + it('other in peerstore', async () => { const other = peers[1] const key = utils.keyForPublicKey(other.id) const msg = new Message(T, key, 0) - waterfall([ - (cb) => dht._add(other, cb), - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.record).to.exist() - expect(response.record.value).to.eql( - other.id.pubKey.bytes - ) - done() - }) + await dht._add(other) + const response = await handler(dht)(peers[0], msg) + expect(response.record).to.exist() + expect(response.record.value).to.eql(other.id.pubKey.bytes) }) - it('other unkown', (done) => { + it('other unkown', async () => { const other = peers[1] const key = utils.keyForPublicKey(other.id) const msg = new Message(T, key, 0) - - waterfall([ - (cb) => handler(dht)(peers[0], msg, cb) - ], (err, response) => { - expect(err).to.not.exist() - expect(response.record).to.not.exist() - - done() - }) + const response = await handler(dht)(peers[0], msg) + expect(response.record).to.not.exist() }) }) }) diff --git a/test/rpc/handlers/ping.spec.js b/test/rpc/handlers/ping.spec.js index f8c7d45f..7f03f326 100644 --- a/test/rpc/handlers/ping.spec.js +++ b/test/rpc/handlers/ping.spec.js @@ -4,6 +4,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect + const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/ping') @@ -17,35 +18,23 @@ describe('rpc - handlers - Ping', () => { let tdht let dht - before((done) => { - createPeerInfo(2, (err, res) => { - expect(err).to.not.exist() - peers = res - done() - }) + before(async () => { + peers = await createPeerInfo(2) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - it('replies with the same message', (done) => { + it('replies with the same message', async () => { const msg = new Message(T, Buffer.from('hello'), 5) + const response = await handler(dht)(peers[0], msg) - handler(dht)(peers[0], msg, (err, response) => { - expect(err).to.not.exist() - expect(response).to.be.eql(msg) - done() - }) + expect(response).to.be.eql(msg) }) }) diff --git a/test/rpc/handlers/put-value.spec.js b/test/rpc/handlers/put-value.spec.js index accd51e5..f46913fe 100644 --- a/test/rpc/handlers/put-value.spec.js +++ b/test/rpc/handlers/put-value.spec.js @@ -6,14 +6,13 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const Record = require('libp2p-record').Record -const promiseToCallback = require('promise-to-callback') +const delay = require('delay') const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/put-value') const utils = require('../../../src/utils') const createPeerInfo = require('../../utils/create-peer-info') -// const createValues = require('../../utils/create-values') const TestDHT = require('../../utils/test-dht') const T = Message.TYPES.PUT_VALUE @@ -23,37 +22,33 @@ describe('rpc - handlers - PutValue', () => { let tdht let dht - before((done) => { - createPeerInfo(2, (err, res) => { - expect(err).to.not.exist() - peers = res - done() - }) + before(async () => { + peers = await createPeerInfo(2) }) - beforeEach((done) => { + beforeEach(async () => { tdht = new TestDHT() - tdht.spawn(1, (err, dhts) => { - expect(err).to.not.exist() - dht = dhts[0] - done() - }) + const dhts = await tdht.spawn(1) + dht = dhts[0] }) - afterEach((done) => { - tdht.teardown(done) - }) + afterEach(() => tdht.teardown()) - it('errors on missing record', (done) => { + it('errors on missing record', async () => { const msg = new Message(T, Buffer.from('hello'), 5) - handler(dht)(peers[0], msg, (err) => { + + try { + await handler(dht)(peers[0], msg) + } catch (err) { expect(err.code).to.eql('ERR_EMPTY_RECORD') - done() - }) + return + } + + throw new Error('should error on missing record') }) - it('stores the record in the datastore', (done) => { + it('stores the record in the datastore', async () => { const msg = new Message(T, Buffer.from('hello'), 5) const record = new Record( Buffer.from('hello'), @@ -61,23 +56,18 @@ describe('rpc - handlers - PutValue', () => { ) msg.record = record - handler(dht)(peers[1], msg, (err, response) => { - expect(err).to.not.exist() - expect(response).to.be.eql(msg) + const response = await handler(dht)(peers[1], msg) + expect(response).to.be.eql(msg) + + const key = utils.bufferToKey(Buffer.from('hello')) + const res = await dht.datastore.get(key) - const key = utils.bufferToKey(Buffer.from('hello')) - promiseToCallback(dht.datastore.get(key))((err, res) => { - expect(err).to.not.exist() - const rec = Record.deserialize(res) + const rec = Record.deserialize(res) - expect(rec).to.have.property('key').eql(Buffer.from('hello')) + expect(rec).to.have.property('key').eql(Buffer.from('hello')) - // make sure some time has passed - setTimeout(() => { - expect(rec.timeReceived < new Date()).to.be.eql(true) - done() - }, 10) - }) - }) + // make sure some time has passed + await delay(10) + expect(rec.timeReceived < new Date()).to.be.eql(true) }) }) diff --git a/test/rpc/index.spec.js b/test/rpc/index.spec.js index 16627267..74c56ade 100644 --- a/test/rpc/index.spec.js +++ b/test/rpc/index.spec.js @@ -21,15 +21,8 @@ const createPeerInfo = require('../utils/create-peer-info') describe('rpc', () => { let peerInfos - before((done) => { - createPeerInfo(2, (err, peers) => { - if (err) { - return done(err) - } - - peerInfos = peers - done() - }) + before(async () => { + peerInfos = await createPeerInfo(2) }) describe('protocolHandler', () => { diff --git a/test/simulation/index.js b/test/simulation/index.js index f4ccba4d..eb7487bf 100644 --- a/test/simulation/index.js +++ b/test/simulation/index.js @@ -3,7 +3,6 @@ /* eslint-disable no-console */ 'use strict' -const { promisify } = require('util') const PeerBook = require('peer-book') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -11,13 +10,10 @@ const multihashes = require('multihashes') const RoutingTable = require('../../src/routing') const Message = require('../../src/message') -const utils = require('../../src/utils') -const testUtils = require('../../test/utils') +const { convertBuffer } = require('../../src/utils') +const { sortClosestPeerInfos } = require('../../test/utils') const DHT = require('../../src') -const convertBuffer = promisify(utils.convertBuffer) -const sortClosestPeerInfos = promisify(testUtils.sortClosestPeerInfos) - const NUM_PEERS = 10e3 // Peers to create, not including us const LATENCY_DEAD_NODE = 120e3 // How long dead nodes should take before erroring const NUM_DEAD_NODES = Math.floor(NUM_PEERS * 0.3) // 30% undialable @@ -103,7 +99,7 @@ async function GetClosestPeersSimulation () { // Add random peers to our table const ourPeers = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) for (const peer of ourPeers) { - await promisify((peer, callback) => dht._add(peer, callback))(peer) + await dht._add(peer) } dht.network.sendRequest = (to, message, callback) => { @@ -128,15 +124,10 @@ async function GetClosestPeersSimulation () { } // Start the dht - await promisify((callback) => dht.start(callback))() + await dht.start() const startTime = Date.now() - const closestPeers = await new Promise((resolve, reject) => { - dht.getClosestPeers(QUERY_KEY, (err, res) => { - if (err) return reject(err) - resolve(res) - }) - }) + const closestPeers = await dht.getClosestPeers(QUERY_KEY) const runTime = Date.now() - startTime return { closestPeers, runTime } @@ -187,7 +178,7 @@ async function MockNetwork (peers) { } const siblings = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) for (const peer of siblings) { - await promisify((callback) => netPeer.routingTable.add(peer.id, callback))() + await netPeer.routingTable.add(peer.id) } } diff --git a/test/utils/create-disjoint-tracks.js b/test/utils/create-disjoint-tracks.js index fa578f0a..7e8f72cc 100644 --- a/test/utils/create-disjoint-tracks.js +++ b/test/utils/create-disjoint-tracks.js @@ -1,6 +1,5 @@ 'use strict' -const waterfall = require('async/waterfall') const { convertPeerId, sortClosestPeers @@ -11,59 +10,58 @@ const { * "next", a successor function for the query to use. See comment * where this is called for details. */ -function createDisjointTracks (peerInfos, goodLength, callback) { +async function createDisjointTracks (peerInfos, goodLength) { const ids = peerInfos.map((info) => info.id) const us = ids[0] - let target - waterfall([ - (cb) => convertPeerId(us, cb), - (ourId, cb) => { - sortClosestPeers(ids, ourId, cb) - }, - (sorted, cb) => { - target = sorted[sorted.length - 1] - sorted = sorted.slice(1) // remove our id - const goodTrack = sorted.slice(0, goodLength) - goodTrack.push(target) // push on target - const badTrack = sorted.slice(goodLength, -1) - if (badTrack.length <= goodTrack.length) { - return cb(new Error(`insufficient number of peers; good length: ${goodTrack.length}, bad length: ${badTrack.length}`)) - } - const tracks = [goodTrack, badTrack] // array of arrays of nodes + const ourId = await convertPeerId(us) + let sorted = await sortClosestPeers(ids, ourId) - const next = (peer, trackNum) => { - const track = tracks[trackNum] - const pos = track.indexOf(peer) - if (pos < 0) { - return null // peer not on expected track - } + const target = sorted[sorted.length - 1] + sorted = sorted.slice(1) // remove our id + const goodTrack = sorted.slice(0, goodLength) + goodTrack.push(target) // push on target + const badTrack = sorted.slice(goodLength, -1) + + if (badTrack.length <= goodTrack.length) { + throw new Error(`insufficient number of peers; good length: ${goodTrack.length}, bad length: ${badTrack.length}`) + } + + const tracks = [goodTrack, badTrack] // array of arrays of nodes + const next = (peer, trackNum) => { + const track = tracks[trackNum] + const pos = track.indexOf(peer) + if (pos < 0) { + return null // peer not on expected track + } - const nextPos = pos + 1 - // if we're at the end of the track - if (nextPos === track.length) { - if (trackNum === 0) { // good track; pathComplete - return { - end: true, - pathComplete: true - } - } else { // bad track; dead end - return { - end: true, - closerPeers: [] - } - } - } else { - const infoIdx = ids.indexOf(track[nextPos]) - return { - closerPeers: [peerInfos[infoIdx]] - } + const nextPos = pos + 1 + // if we're at the end of the track + if (nextPos === track.length) { + if (trackNum === 0) { // good track; pathComplete + return { + end: true, + pathComplete: true + } + } else { // bad track; dead end + return { + end: true, + closerPeers: [] } } - - cb(null, target.id, [goodTrack[0], badTrack[0]], next) + } else { + const infoIdx = ids.indexOf(track[nextPos]) + return { + closerPeers: [peerInfos[infoIdx]] + } } - ], callback) + } + + return { + targetId: target.id, + starts: [goodTrack[0], badTrack[0]], + getResponse: next + } } module.exports = createDisjointTracks diff --git a/test/utils/create-peer-id.js b/test/utils/create-peer-id.js new file mode 100644 index 00000000..6057d4d6 --- /dev/null +++ b/test/utils/create-peer-id.js @@ -0,0 +1,19 @@ +'use strict' + +const PeerId = require('peer-id') + +/** + * Creates multiple PeerIds + * @param {number} length The number of `PeerId` to create + * @returns {Promise>} + */ +function createPeerId (length) { + return Promise.all( + Array.from({ length }).map(async () => { + const id = await PeerId.create({ bits: 512 }) + return id + }) + ) +} + +module.exports = createPeerId diff --git a/test/utils/index.js b/test/utils/index.js index af96067c..56efb6f4 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -7,19 +7,14 @@ const { sortClosestPeers } = require('../../src/utils') * * @param {Array} peers * @param {Buffer} target - * @param {function(Error, Array)} callback - * @returns {void} + * @returns {Array} */ -exports.sortClosestPeerInfos = (peers, target, callback) => { - sortClosestPeers(peers.map(peerInfo => peerInfo.id), target, (err, sortedPeerIds) => { - if (err) return callback(err) +exports.sortClosestPeerInfos = async (peers, target) => { + const sortedPeerIds = await sortClosestPeers(peers.map(peerInfo => peerInfo.id), target) - const sortedPeerInfos = sortedPeerIds.map((peerId) => { - return peers.find((peerInfo) => { - return peerInfo.id.isEqual(peerId) - }) + return sortedPeerIds.map((peerId) => { + return peers.find((peerInfo) => { + return peerInfo.id.isEqual(peerId) }) - - callback(null, sortedPeerInfos) }) }