diff --git a/package.json b/package.json index b2db184b..f5eb00a5 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "abort-controller": "^3.0.0", "async": "^2.6.2", "base32.js": "~0.1.0", - "chai-checkmark": "^1.0.1", "cids": "~0.7.1", "debug": "^4.1.1", "err-code": "^2.0.0", @@ -60,11 +59,11 @@ "p-queue": "^6.2.1", "p-timeout": "^3.2.0", "p-times": "^2.1.0", - "paramap-it": "^0.1.1", "peer-id": "~0.13.5", "peer-info": "~0.17.0", "promise-to-callback": "^1.0.0", "protons": "^1.0.1", + "streaming-iterables": "^4.1.1", "varint": "^5.0.0", "xor-distance": "^2.0.0" }, @@ -72,6 +71,7 @@ "aegir": "^20.4.1", "async-iterator-all": "^1.0.0", "chai": "^4.2.0", + "chai-checkmark": "^1.0.1", "datastore-level": "~0.12.1", "delay": "^4.3.0", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index fca9e5b6..b4de0a6d 100644 --- a/src/index.js +++ b/src/index.js @@ -67,7 +67,7 @@ class KadDHT extends EventEmitter { randomWalk = {} }) { super() - assert(dialer, 'libp2p-kad-dht requires a instance of Dialer') + assert(dialer, 'libp2p-kad-dht requires an instance of Dialer') /** * Local reference to the libp2p dialer instance @@ -82,7 +82,7 @@ class KadDHT extends EventEmitter { this.peerInfo = peerInfo /** - * Local peer info + * Local PeerStore * @type {PeerStore} */ this.peerStore = peerStore diff --git a/src/network.js b/src/network.js index 9a1020c5..a76b7965 100644 --- a/src/network.js +++ b/src/network.js @@ -40,7 +40,6 @@ class Network { return } - // TODO remove: add a way to check if switch has started or not if (!this.dht.isStarted) { throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK') } @@ -52,7 +51,7 @@ class Network { // register protocol with topology const topology = new MulticodecTopology({ - multicodecs: c.PROTOCOL_DHT, + multicodecs: [c.PROTOCOL_DHT], handlers: { onConnect: this._onPeerConnected, onDisconnect: () => {} @@ -98,15 +97,11 @@ class Network { * Registrar notifies a connection successfully with dht protocol. * @private * @param {PeerInfo} peerInfo remote peer info - * @param {Connection} conn connection to the peer * @returns {Promise} */ - async _onPeerConnected (peerInfo, conn) { + async _onPeerConnected (peerInfo) { await this.dht._add(peerInfo) this._log('added to the routing table: %s', peerInfo.id.toB58String()) - - // Open a stream with the connected peer - await conn.newStream(c.PROTOCOL_DHT) } /** @@ -152,40 +147,40 @@ class Network { * If no response is received after the specified timeout * this will error out. * - * @param {Connection} conn - the connection to use + * @param {DuplexIterable} stream - the stream to use * @param {Buffer} msg - the message to send * @returns {Promise} * @private */ - async _writeReadMessage (conn, msg) { // eslint-disable-line require-await + async _writeReadMessage (stream, msg) { // eslint-disable-line require-await return pTimeout( - writeReadMessage(conn, msg), + writeReadMessage(stream, msg), this.readMessageTimeout ) } /** - * Write a message to the given connection. + * Write a message to the given stream. * - * @param {Connection} conn - the connection to use + * @param {DuplexIterable} stream - the stream to use * @param {Buffer} msg - the message to send * @returns {Promise} * @private */ - _writeMessage (conn, msg) { + _writeMessage (stream, msg) { return pipe( [msg], lp.encode(), - conn + stream ) } } -async function writeReadMessage (conn, msg) { +async function writeReadMessage (stream, msg) { const res = await pipe( [msg], lp.encode(), - conn, + stream, utils.itFilter( (msg) => msg.length < c.maxMessageSize ), diff --git a/src/rpc/index.js b/src/rpc/index.js index cdac664c..c494877e 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -2,7 +2,6 @@ const pipe = require('it-pipe') const lp = require('it-length-prefixed') -const paramap = require('paramap-it') const PeerInfo = require('peer-info') const Message = require('../message') @@ -22,15 +21,14 @@ module.exports = (dht) => { * * @private */ - async function handleMessage (peer, msg) { // eslint-disable-line - // get handler & exectue it + async function handleMessage (peer, msg) { + // get handler & execute it const handler = getMessageHandler(msg.type) try { await dht._add(peer) } catch (err) { - log.error('Failed to update the kbucket store') - log.error(err) + log.error('Failed to update the kbucket store', err) } if (!handler) { @@ -44,14 +42,12 @@ module.exports = (dht) => { /** * Handle incoming streams on the dht protocol. * @param {Object} props - * @param {string} props.protocol * @param {DuplexStream} props.stream * @param {Connection} props.connection connection * @returns {Promise} */ - return async function onIncomingStream ({ protocol, stream, connection }) { + return async function onIncomingStream ({ stream, connection }) { const peerInfo = await PeerInfo.create(connection.remotePeer) - peerInfo.protocols.add(protocol) try { await dht._add(peerInfo) @@ -65,18 +61,21 @@ module.exports = (dht) => { await pipe( stream.source, lp.decode(), - utils.itFilter( - (msg) => msg.length < c.maxMessageSize - ), - source => paramap(source, rawMsg => { - const msg = Message.deserialize(rawMsg.slice()) - return handleMessage(peerInfo, msg) - }), - // Not all handlers will return a response - utils.itFilter(Boolean), - source => paramap(source, response => { - return response.serialize() - }), + source => (async function * () { + for await (const msg of source) { + // Check message size + if (msg.length < c.maxMessageSize) { + // handle the message + const desMessage = Message.deserialize(msg.slice()) + const res = await handleMessage(peerInfo, desMessage) + + // Not all handlers will return a response + if (res) { + yield res.serialize() + } + } + } + })(), lp.encode(), stream.sink ) diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 7d178315..aa92f699 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -48,10 +48,7 @@ describe('Query', () => { before('create a dht', () => { const peerStore = new PeerBook() dht = new DHT({ - dialer: { - _peerInfo: ourPeerInfo, - _peerBook: peerStore - }, + dialer: {}, peerStore, peerInfo: ourPeerInfo }) diff --git a/test/rpc/index.spec.js b/test/rpc/index.spec.js index 97cc2b6a..d4e59a28 100644 --- a/test/rpc/index.spec.js +++ b/test/rpc/index.spec.js @@ -7,12 +7,14 @@ const expect = chai.expect const pDefer = require('p-defer') const pipe = require('it-pipe') const lp = require('it-length-prefixed') +const { collect } = require('streaming-iterables') const Message = require('../../src/message') const rpc = require('../../src/rpc') const createPeerInfo = require('../utils/create-peer-info') const TestDHT = require('../utils/test-dht') +const toBuffer = require('../utils/to-buffer') describe('rpc', () => { let peerInfos @@ -38,35 +40,20 @@ describe('rpc', () => { defer.resolve() } - const data = [] - await pipe( + const source = await pipe( [msg.serialize()], lp.encode(), - async source => { - for await (const chunk of source) { - data.push(chunk.slice()) - } - } + collect ) const duplexStream = { - source: function * () { - const array = data - - while (array.length) { - yield array.shift() - } - }, + source, sink: async (source) => { - const res = [] - await pipe( + const res = await pipe( source, lp.decode(), - async source => { - for await (const chunk of source) { - res.push(chunk.slice()) - } - } + toBuffer, // Ensure we have buffers here for validateMessage to consume + collect ) validateMessage(res) } diff --git a/test/utils/index.js b/test/utils/index.js index 6d9b6f00..1350af3e 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -3,7 +3,7 @@ const delay = require('delay') const pRetry = require('p-retry') const pTimeout = require('p-timeout') -const DuplexPair = require('it-pair/duplex') +const duplexPair = require('it-pair/duplex') const { sortClosestPeers } = require('../../src/utils') @@ -35,7 +35,7 @@ const createMockRegistrar = (registrarRecord) => ({ exports.createMockRegistrar = createMockRegistrar const ConnectionPair = () => { - const [d0, d1] = DuplexPair() + const [d0, d1] = duplexPair() return [ { diff --git a/test/utils/to-buffer.js b/test/utils/to-buffer.js new file mode 100644 index 00000000..7865c0ba --- /dev/null +++ b/test/utils/to-buffer.js @@ -0,0 +1,15 @@ +'use strict' +/** + * Converts BufferList messages to Buffers + * @param {*} source + * @returns {AsyncGenerator} + */ +const toBuffer = (source) => { + return (async function * () { + for await (const chunk of source) { + yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() + } + })() +} + +module.exports = toBuffer