Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 14, 2019
1 parent 06f9b02 commit 52d056e
Show file tree
Hide file tree
Showing 36 changed files with 1,150 additions and 1,552 deletions.
29 changes: 15 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,43 +32,43 @@
"url": "https://github.com/libp2p/js-libp2p-kad-dht/issues"
},
"engines": {
"node": ">=6.0.0",
"npm": ">=3.0.0"
"node": ">=10.0.0",
"npm": ">=6.0.0"
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
"cids": "~0.7.0",
"cids": "~0.7.1",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"err-code": "^2.0.0",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.7.0",
"interface-datastore": "~0.8.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-crypto": "~0.17.1",
"libp2p-record": "~0.7.0",
"multihashes": "~0.4.14",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
"p-filter": "^2.1.0",
"p-map": "^3.0.0",
"p-queue": "^6.0.0",
"p-queue": "^6.2.1",
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"peer-id": "~0.13.3",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"promisify-es6": "^1.0.3",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-stream": "^3.6.9",
"pull-length-prefixed": "^1.3.3",
"pull-stream": "^3.6.14",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"aegir": "^20.4.1",
"chai": "^4.2.0",
"datastore-level": "~0.12.1",
"delay": "^4.3.0",
Expand All @@ -80,11 +80,12 @@
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-defer": "^3.0.0",
"p-each-series": "^2.1.0",
"p-map-series": "^2.1.0",
"p-retry": "^4.2.0",
"peer-book": "~0.9.1",
"sinon": "^7.3.1"
"peer-book": "~0.9.2",
"sinon": "^7.5.0"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
9 changes: 4 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const crypto = require('libp2p-crypto')

const promisify = require('promisify-es6')
const pFilter = require('p-filter')
const pTimeout = require('p-timeout')

Expand Down Expand Up @@ -241,7 +240,7 @@ class KadDHT extends EventEmitter {
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<void>}
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get (key, options = {}) {
options.timeout = options.timeout || c.minute
Expand Down Expand Up @@ -425,7 +424,7 @@ class KadDHT extends EventEmitter {
// try dht directly
const pkKey = utils.keyForPublicKey(peer)
const value = await this.get(pkKey)
pk = crypto.unmarshalPublicKey(value)
pk = crypto.keys.unmarshalPublicKey(value)
}

info.id = new PeerId(peer.id, null, pk)
Expand Down Expand Up @@ -468,15 +467,15 @@ class KadDHT extends EventEmitter {
// Add peer as provider
await this.providers.addProvider(key, this.peerInfo.id)

// Notice closest peers
// Notify closest peers
const peers = await this.getClosestPeers(key.buffer)
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [this.peerInfo]

await Promise.all(peers.map(async (peer) => {
this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
try {
await promisify(cb => this.network.sendMessage(peer, msg, cb))()
await this.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
Expand Down
119 changes: 56 additions & 63 deletions src/network.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict'

const pull = require('pull-stream')
const timeout = require('async/timeout')
const pTimeout = require('p-timeout')
const lp = require('pull-length-prefixed')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

const errcode = require('err-code')
Expand Down Expand Up @@ -33,7 +32,7 @@ class Network {

/**
* Start the network.
* @async
* @returns {void}
*/
start () {
if (this._running) {
Expand All @@ -56,7 +55,6 @@ class Network {

/**
* Stop all network activity.
*
* @returns {void}
*/
stop () {
Expand Down Expand Up @@ -111,50 +109,40 @@ class Network {

/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Promise<Message>}
*/
sendRequest (to, msg, callback) {
async sendRequest (to, msg) {
// TODO: record latency
if (!this.isConnected) {
return callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE'))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())
this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}

this._writeReadMessage(conn, msg.serialize(), callback)
})
const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))()
return this._writeReadMessage(conn, msg.serialize())
}

/**
* Sends a message without expecting an answer.
*
* @param {PeerId} to
* @param {Message} msg
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
sendMessage (to, msg, callback) {
async sendMessage (to, msg) {
if (!this.isConnected) {
return setImmediate(() => callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())

this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}

this._writeMessage(conn, msg.serialize(), callback)
})
const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))()
return this._writeMessage(conn, msg.serialize())
}

/**
Expand All @@ -164,61 +152,66 @@ class Network {
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Message}
* @private
*/
_writeReadMessage (conn, msg, callback) {
timeout(
writeReadMessage,
_writeReadMessage (conn, msg) {
return pTimeout(
writeReadMessage(conn, msg),
this.readMessageTimeout
)(conn, msg, callback)
)
}

/**
* Write a message to the given connection.
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
* @private
*/
_writeMessage (conn, msg, callback) {
_writeMessage (conn, msg) {
return new Promise((resolve, reject) => {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd((err) => {
if (err) return reject(err)
resolve()
})
)
})
}
}

function writeReadMessage (conn, msg) {
return new Promise((resolve, reject) => {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd(callback)
pull.filter((msg) => msg.length < c.maxMessageSize),
lp.decode(),
pull.collect((err, res) => {
if (err) {
return reject(err)
}
if (res.length === 0) {
return reject(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED'))
}

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return reject(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE'))
}

resolve(response)
})
)
}
}

function writeReadMessage (conn, msg, callback) {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.filter((msg) => msg.length < c.maxMessageSize),
lp.decode(),
pull.collect((err, res) => {
if (err) {
return callback(err)
}
if (res.length === 0) {
return callback(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED'))
}

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return callback(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE'))
}

callback(null, response)
})
)
})
}

module.exports = Network
9 changes: 4 additions & 5 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')

const promisify = require('promisify-es6')
const PeerId = require('peer-id')
const libp2pRecord = require('libp2p-record')
const PeerInfo = require('peer-info')
Expand Down Expand Up @@ -179,7 +178,7 @@ module.exports = (dht) => ({
dht._log('_findPeerSingle %s', peer.toB58String())
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0)

return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
},

/**
Expand All @@ -197,7 +196,7 @@ module.exports = (dht) => ({
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0)
msg.record = rec

const resp = await promisify(cb => dht.network.sendRequest(target, msg, cb))()
const resp = await dht.network.sendRequest(target, msg)

if (!resp.record.value.equals(Record.deserialize(rec).value)) {
throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID')
Expand Down Expand Up @@ -362,7 +361,7 @@ module.exports = (dht) => ({

async _getValueSingle (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
},

/**
Expand Down Expand Up @@ -500,6 +499,6 @@ module.exports = (dht) => ({
*/
async _findProvidersSingle (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
}
})
9 changes: 4 additions & 5 deletions src/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Providers {
/**
* Release any resources.
*
* @returns {undefined}
* @returns {void}
*/
stop () {
if (this._cleaner) {
Expand All @@ -73,8 +73,7 @@ class Providers {
/**
* Check all providers if they are still valid, and if not delete them.
*
* @returns {Promise}
*
* @returns {Promise<void>}
* @private
*/
_cleanup () {
Expand Down Expand Up @@ -178,7 +177,7 @@ class Providers {
*
* @param {CID} cid
* @param {PeerId} provider
* @returns {Promise}
* @returns {Promise<void>}
*/
async addProvider (cid, provider) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
Expand Down Expand Up @@ -232,7 +231,7 @@ function makeProviderKey (cid) {
* @param {CID} cid
* @param {PeerId} peer
* @param {number} time
* @returns {Promise}
* @returns {Promise<void>}
*
* @private
*/
Expand Down
2 changes: 1 addition & 1 deletion src/routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class RoutingTable {
* Remove a given peer from the table.
*
* @param {PeerId} peer
* @returns {Promose<void>}
* @returns {Promise<void>}
*/
async remove (peer) {
const id = await utils.convertPeerId(peer)
Expand Down
Loading

0 comments on commit 52d056e

Please sign in to comment.