From 6a94d9a062eeff2e315fa77034ffff49bd89ff0f Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 20 Mar 2019 18:01:08 +0100 Subject: [PATCH] feat: add basic dial queue to avoid many connections to peer (#310) BREAKING CHANGE: This adds a very basic dial queue peer peer. This will prevent multiple, simultaneous dial requests to the same peer from creating multiple connections. The requests will be queued per peer, and will leverage the same connection when possible. The breaking change here is that `.dial`, will no longer return a connection. js-libp2p, circuit relay, and kad-dht, which use `.dial` were not using the returned connection. So while this is a breaking change it should not break the existing libp2p stack. If custom applications are leveraging the returned connection, they will need to convert to only using the connection returned via the callback. * chore: dont log priviatized unless it actually happened * refactor: only get our addresses for filtering once --- package.json | 2 +- src/connection/base.js | 1 - src/connection/handler.js | 33 +++--- src/connection/incoming.js | 5 +- src/connection/index.js | 34 +++--- src/connection/manager.js | 8 +- src/dialer.js | 110 ------------------- src/dialer/index.js | 68 ++++++++++++ src/dialer/queue.js | 219 +++++++++++++++++++++++++++++++++++++ src/dialer/queueManager.js | 56 ++++++++++ src/errors.js | 24 ++-- src/get-peer-info.js | 2 +- src/index.js | 13 ++- src/transport.js | 3 +- test/dial-fsm.node.js | 169 ++++++++++++++++++---------- test/identify.node.js | 8 +- test/pnet.node.js | 3 - test/swarm-muxing.node.js | 15 ++- 18 files changed, 537 insertions(+), 236 deletions(-) delete mode 100644 src/dialer.js create mode 100644 src/dialer/index.js create mode 100644 src/dialer/queue.js create mode 100644 src/dialer/queueManager.js diff --git a/package.json b/package.json index 5cd7fb3ab7..39f0b88313 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "peer-book": "~0.9.1", "portfinder": "^1.0.20", "pull-length-prefixed": "^1.3.1", - "pull-mplex": "~0.1.0", + "pull-mplex": "~0.1.2", "pull-pair": "^1.1.0", "sinon": "^7.2.3", "webrtcsupport": "^2.2.0" diff --git a/src/connection/base.js b/src/connection/base.js index 087c58200e..cb3d5a59bb 100644 --- a/src/connection/base.js +++ b/src/connection/base.js @@ -92,7 +92,6 @@ class BaseConnection extends EventEmitter { * @returns {void} */ _onPrivatized () { - this.log('successfully privatized incoming connection') this.emit('private', this.conn) } diff --git a/src/connection/handler.js b/src/connection/handler.js index 9443167f2d..abed6126cb 100644 --- a/src/connection/handler.js +++ b/src/connection/handler.js @@ -14,31 +14,32 @@ function listener (_switch) { * @param {function} handler A custom handler to use * @returns {function(Connection)} A connection handler function */ - return (transportKey, handler) => { + return function (transportKey, handler) { /** * Takes a base connection and manages listening behavior * * @param {Connection} conn The connection to manage * @returns {void} */ - return (conn) => { - // Add a transport level observer, if needed - const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn + return function (conn) { + log('received incoming connection for transport %s', transportKey) + conn.getPeerInfo((_, peerInfo) => { + // Add a transport level observer, if needed + const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn + const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo }) - log('received incoming connection') - const connFSM = new IncomingConnection({ connection, _switch, transportKey }) + connFSM.once('error', (err) => log(err)) + connFSM.once('private', (_conn) => { + // Use the custom handler, if it was provided + if (handler) { + return handler(_conn) + } + connFSM.encrypt() + }) + connFSM.once('encrypted', () => connFSM.upgrade()) - connFSM.once('error', (err) => log(err)) - connFSM.once('private', (_conn) => { - // Use the custom handler, if it was provided - if (handler) { - return handler(_conn) - } - connFSM.encrypt() + connFSM.protect() }) - connFSM.once('encrypted', () => connFSM.upgrade()) - - connFSM.protect() } } } diff --git a/src/connection/incoming.js b/src/connection/incoming.js index df9cef967b..f66332a046 100644 --- a/src/connection/incoming.js +++ b/src/connection/incoming.js @@ -7,13 +7,14 @@ const withIs = require('class-is') const BaseConnection = require('./base') class IncomingConnectionFSM extends BaseConnection { - constructor ({ connection, _switch, transportKey }) { + constructor ({ connection, _switch, transportKey, peerInfo }) { super({ _switch, name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}` }) this.conn = connection - this.theirPeerInfo = null + this.theirPeerInfo = peerInfo || null + this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null this.ourPeerInfo = this.switch._peerInfo this.transportKey = transportKey this.protocolMuxer = this.switch.protocolMuxer(this.transportKey) diff --git a/src/connection/index.js b/src/connection/index.js index bbcfef2c64..a7b8b05293 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -5,6 +5,7 @@ const Circuit = require('libp2p-circuit') const multistream = require('multistream-select') const withIs = require('class-is') const BaseConnection = require('./base') +const parallel = require('async/parallel') const observeConnection = require('../observe-connection') const { @@ -33,7 +34,7 @@ const { */ class ConnectionFSM extends BaseConnection { /** - * @param {ConnectionOptions} param0 + * @param {ConnectionOptions} connectionOptions * @constructor */ constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) { @@ -261,7 +262,7 @@ class ConnectionFSM extends BaseConnection { * @returns {void} */ _onDisconnecting () { - this.log('disconnecting from %s', this.theirB58Id) + this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer)) // Issue disconnects on both Peers if (this.theirPeerInfo) { @@ -272,22 +273,31 @@ class ConnectionFSM extends BaseConnection { delete this.switch.conns[this.theirB58Id] + let tasks = [] + // Clean up stored connections if (this.muxer) { - this.muxer.end() - delete this.muxer - this.switch.emit('peer-mux-closed', this.theirPeerInfo) + tasks.push((cb) => { + this.muxer.end(() => { + delete this.muxer + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + cb() + }) + }) } // If we have the base connection, abort it + // Ignore abort errors, since we're closing if (this.conn) { - this.conn.source(true, () => { - this._state('done') - delete this.conn - }) - } else { - this._state('done') + try { + this.conn.source.abort() + } catch (_) { } + delete this.conn } + + parallel(tasks, () => { + this._state('done') + }) } /** @@ -366,8 +376,6 @@ class ConnectionFSM extends BaseConnection { const conn = observeConnection(null, key, _conn, this.switch.observer) this.muxer = this.switch.muxers[key].dialer(conn) - // this.switch.muxedConns[this.theirB58Id] = this - this.switch.connection.add(this) this.muxer.once('close', () => { this.close() diff --git a/src/connection/manager.js b/src/connection/manager.js index 7b5d698eb0..5874d7688b 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -61,8 +61,12 @@ class ConnectionManager { */ getOne (peerId) { if (this.connections[peerId]) { - // TODO: Maybe select the best? - return this.connections[peerId][0] + // Only return muxed connections + for (var i = 0; i < this.connections[peerId].length; i++) { + if (this.connections[peerId][i].getState() === 'MUXED') { + return this.connections[peerId][i] + } + } } return null } diff --git a/src/dialer.js b/src/dialer.js deleted file mode 100644 index 4d324b0a83..0000000000 --- a/src/dialer.js +++ /dev/null @@ -1,110 +0,0 @@ -'use strict' - -const Connection = require('interface-connection').Connection -const ConnectionFSM = require('./connection') -const getPeerInfo = require('./get-peer-info') -const once = require('once') -const nextTick = require('async/nextTick') - -const debug = require('debug') -const log = debug('libp2p:switch:dial') - -function maybePerformHandshake ({ protocol, proxyConnection, connection, callback }) { - if (protocol) { - return connection.shake(protocol, (err, conn) => { - if (!conn) { - return callback(err) - } - - proxyConnection.setPeerInfo(connection.theirPeerInfo) - proxyConnection.setInnerConn(conn) - callback(null, proxyConnection) - }) - } - - nextTick(callback) -} - -/** - * Returns a Dialer generator that when called, will immediately begin dialing - * to the given `peer`. - * - * @param {Switch} _switch - * @param {Boolean} returnFSM Whether or not to return an fsm instead of a Connection - * @returns {function(PeerInfo, string, function(Error, Connection))} - */ -function dial (_switch, returnFSM) { - /** - * Creates a new dialer and immediately begins dialing to the given `peer` - * - * @param {PeerInfo} peer - * @param {string} protocol - * @param {function(Error, Connection)} callback - * @returns {Connection} - */ - return (peer, protocol, callback) => { - if (typeof protocol === 'function') { - callback = protocol - protocol = null - } - - callback = once(callback || function noop () {}) - - const peerInfo = getPeerInfo(peer, _switch._peerBook) - const b58Id = peerInfo.id.toB58String() - - log('dialing to %s with protocol %s', b58Id, protocol || 'unknown') - - let connection = _switch.connection.getOne(b58Id) - - if (!ConnectionFSM.isConnectionFSM(connection)) { - connection = new ConnectionFSM({ - _switch, - peerInfo, - muxer: null, - conn: null - }) - connection.once('error', (err) => callback(err)) - connection.once('connected', () => connection.protect()) - connection.once('private', () => connection.encrypt()) - connection.once('encrypted', () => connection.upgrade()) - connection.once('muxed', () => { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - }) - connection.once('unmuxed', () => { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - }) - } - - const proxyConnection = new Connection() - proxyConnection.setPeerInfo(peerInfo) - - nextTick(() => { - // If we have a muxed connection, attempt the protocol handshake - if (connection.getState() === 'MUXED') { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - } else { - connection.dial() - } - }) - - return returnFSM ? connection : proxyConnection - } -} - -module.exports = dial diff --git a/src/dialer/index.js b/src/dialer/index.js new file mode 100644 index 0000000000..8b4c8761f9 --- /dev/null +++ b/src/dialer/index.js @@ -0,0 +1,68 @@ +'use strict' + +const DialQueueManager = require('./queueManager') +const getPeerInfo = require('../get-peer-info') + +module.exports = function (_switch) { + const dialQueueManager = new DialQueueManager(_switch) + + _switch.state.on('STOPPING:enter', abort) + + /** + * @param {DialRequest} dialRequest + * @returns {void} + */ + function _dial ({ peerInfo, protocol, useFSM, callback }) { + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } + + try { + peerInfo = getPeerInfo(peerInfo, _switch._peerBook) + } catch (err) { + return callback(err) + } + + // Add it to the queue, it will automatically get executed + dialQueueManager.add({ peerInfo, protocol, useFSM, callback }) + } + + /** + * Aborts all dials that are queued. This should + * only be used when the Switch is being stopped + * + * @param {function} callback + */ + function abort (callback) { + dialQueueManager.abort() + callback() + } + + /** + * Adds the dial request to the queue for the given `peerInfo` + * @param {PeerInfo} peerInfo + * @param {string} protocol + * @param {function(Error, Connection)} callback + */ + function dial (peerInfo, protocol, callback) { + _dial({ peerInfo, protocol, useFSM: false, callback }) + } + + /** + * Behaves like dial, except it calls back with a ConnectionFSM + * + * @param {PeerInfo} peerInfo + * @param {string} protocol + * @param {function(Error, ConnectionFSM)} callback + */ + function dialFSM (peerInfo, protocol, callback) { + _dial({ peerInfo, protocol, useFSM: true, callback }) + } + + return { + dial, + dialFSM, + abort + } +} diff --git a/src/dialer/queue.js b/src/dialer/queue.js new file mode 100644 index 0000000000..b1c6cf1436 --- /dev/null +++ b/src/dialer/queue.js @@ -0,0 +1,219 @@ +'use strict' + +const ConnectionFSM = require('../connection') +const { DIAL_ABORTED } = require('../errors') +const Connection = require('interface-connection').Connection +const nextTick = require('async/nextTick') +const once = require('once') +const debug = require('debug') +const log = debug('libp2p:switch:dial') +log.error = debug('libp2p:switch:dial:error') + +/** + * Components required to execute a dial + * @typedef {Object} DialRequest + * @property {PeerInfo} peerInfo - The peer to dial to + * @property {string} [protocol] - The protocol to create a stream for + * @property {boolean} useFSM - If `callback` should return a ConnectionFSM + * @property {function(Error, Connection|ConnectionFSM)} callback + */ + +/** + * @typedef {Object} NewConnection + * @property {ConnectionFSM} connectionFSM + * @property {boolean} didCreate + */ + +/** + * Attempts to create a new connection or stream (when muxed), + * via negotiation of the given `protocol`. If no `protocol` is + * provided, no action will be taken and `callback` will be called + * immediately with no error or values. + * + * @param {object} options + * @param {string} options.protocol + * @param {ConnectionFSM} options.connection + * @param {function(Error, Connection)} options.callback + * @returns {void} + */ +function createConnectionWithProtocol ({ protocol, connection, callback }) { + if (!protocol) { + return callback() + } + connection.shake(protocol, (err, conn) => { + if (!conn) { + return callback(err) + } + + const proxyConnection = new Connection() + proxyConnection.setPeerInfo(connection.theirPeerInfo) + proxyConnection.setInnerConn(conn) + callback(null, proxyConnection) + }) +} + +/** + * A convenience array wrapper for controlling + * a per peer queue + * + * @returns {Queue} + */ +class Queue { + /** + * @constructor + * @param {PeerInfo} peerInfo + * @param {Switch} _switch + */ + constructor (peerInfo, _switch) { + this.peerInfo = peerInfo + this.id = peerInfo.id.toB58String() + this.switch = _switch + this._queue = [] + this.isRunning = false + } + get length () { + return this._queue.length + } + + /** + * Adds the dial request to the queue and starts the + * queue if it is stopped + * @param {string} protocol + * @param {boolean} useFSM If callback should use a ConnectionFSM instead + * @param {function(Error, Connection)} callback + */ + add (protocol, useFSM, callback) { + this._queue.push({ protocol, useFSM, callback }) + if (!this.isRunning) { + log('starting dial queue to %s', this.id) + this.start() + } + } + + /** + * Starts the queue + */ + start () { + this.isRunning = true + this._run() + } + + /** + * Stops the queue + */ + stop () { + this.isRunning = false + } + + /** + * Stops the queue and errors the callback for each dial request + */ + abort () { + this.stop() + while (this.length > 0) { + let dial = this._queue.shift() + dial.callback(DIAL_ABORTED()) + } + } + + /** + * Attempts to find a muxed connection for the given peer. If one + * isn't found, a new one will be created. + * + * Returns an array containing two items. The ConnectionFSM and wether + * or not the ConnectionFSM was just created. The latter can be used + * to determine dialing needs. + * + * @private + * @param {PeerInfo} peerInfo + * @returns {NewConnection} + */ + _getOrCreateConnection (peerInfo) { + let connectionFSM = this.switch.connection.getOne(this.id) + let didCreate = false + + if (!connectionFSM) { + connectionFSM = new ConnectionFSM({ + _switch: this.switch, + peerInfo, + muxer: null, + conn: null + }) + + // Add control events and start the dialer + connectionFSM.once('connected', () => connectionFSM.protect()) + connectionFSM.once('private', () => connectionFSM.encrypt()) + connectionFSM.once('encrypted', () => connectionFSM.upgrade()) + + didCreate = true + } + + return { connectionFSM, didCreate } + } + + /** + * Executes the next dial in the queue for the given peer + * @private + * @returns {void} + */ + _run () { + // If we have no items in the queue or we're stopped, exit + if (this.length < 1 || !this.isRunning) { + log('stopping the queue for %s', this.id) + return this.stop() + } + + const next = once(() => { + log('starting next dial to %s', this.id) + this._run() + }) + + let queuedDial = this._queue.shift() + let { connectionFSM, didCreate } = this._getOrCreateConnection(this.peerInfo) + + // If the dial expects a ConnectionFSM, we can provide that back now + if (queuedDial.useFSM) { + nextTick(queuedDial.callback, null, connectionFSM) + } + + // If we can handshake protocols, get a new stream and call run again + if (['MUXED', 'CONNECTED'].includes(connectionFSM.getState())) { + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + return + } + + // If we error, error the queued dial + // In the future, it may be desired to error the other queued dials, + // depending on the error. + connectionFSM.once('error', (err) => { + queuedDial.callback(err) + }) + + connectionFSM.once('close', () => { + next() + }) + + // If we're not muxed yet, add listeners + connectionFSM.once('muxed', () => { + this.switch.connection.add(connectionFSM) + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + }) + + connectionFSM.once('unmuxed', () => { + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + }) + + // If we have a new connection, start dialing + if (didCreate) { + connectionFSM.dial() + } + } +} + +module.exports = Queue diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js new file mode 100644 index 0000000000..775dc5287e --- /dev/null +++ b/src/dialer/queueManager.js @@ -0,0 +1,56 @@ +'use strict' + +const once = require('once') +const Queue = require('./queue') + +const noop = () => {} + +class DialQueueManager { + /** + * @constructor + * @param {Switch} _switch + */ + constructor (_switch) { + this._queue = {} + this.switch = _switch + } + + /** + * Iterates over all items in the DialerQueue + * and executes there callback with an error. + * + * This causes the entire DialerQueue to be drained + */ + abort () { + const queues = Object.values(this._queue) + queues.forEach(dialQueue => { + dialQueue.abort() + }) + } + + /** + * Adds the `dialRequest` to the queue and ensures the queue is running + * + * @param {DialRequest} dialRequest + */ + add ({ peerInfo, protocol, useFSM, callback }) { + callback = callback ? once(callback) : noop + + let dialQueue = this.getQueue(peerInfo) + dialQueue.add(protocol, useFSM, callback) + } + + /** + * Returns the `Queue` for the given `peerInfo` + * @param {PeerInfo} peerInfo + * @returns {Queue} + */ + getQueue (peerInfo) { + const id = peerInfo.id.toB58String() + + this._queue[id] = this._queue[id] || new Queue(peerInfo, this.switch) + return this._queue[id] + } +} + +module.exports = DialQueueManager diff --git a/src/errors.js b/src/errors.js index 22833bd238..28beb62ec8 100644 --- a/src/errors.js +++ b/src/errors.js @@ -2,16 +2,18 @@ const errCode = require('err-code') -module.exports.PROTECTOR_REQUIRED = 'No protector provided with private network enforced' -module.exports.CONNECTION_FAILED = (err) => errCode(err, 'CONNECTION_FAILED') -module.exports.DIAL_SELF = () => errCode(new Error('A node cannot dial itself'), 'DIAL_SELF') -module.exports.NO_TRANSPORTS_REGISTERED = () => errCode(new Error('No transports registered, dial not possible'), 'NO_TRANSPORTS_REGISTERED') -module.exports.UNEXPECTED_END = () => errCode(new Error('Unexpected end of input from reader.'), 'UNEXPECTED_END') -module.exports.INVALID_STATE_TRANSITION = (err) => errCode(err, 'INVALID_STATE_TRANSITION') - -module.exports.maybeUnexpectedEnd = (err) => { - if (err === true) { - return module.exports.UNEXPECTED_END() +module.exports = { + CONNECTION_FAILED: (err) => errCode(err, 'CONNECTION_FAILED'), + DIAL_ABORTED: () => errCode('Dial was aborted', 'DIAL_ABORTED'), + DIAL_SELF: () => errCode('A node cannot dial itself', 'DIAL_SELF'), + INVALID_STATE_TRANSITION: (err) => errCode(err, 'INVALID_STATE_TRANSITION'), + NO_TRANSPORTS_REGISTERED: () => errCode('No transports registered, dial not possible', 'NO_TRANSPORTS_REGISTERED'), + PROTECTOR_REQUIRED: () => errCode('No protector provided with private network enforced', 'PROTECTOR_REQUIRED'), + UNEXPECTED_END: () => errCode('Unexpected end of input from reader.', 'UNEXPECTED_END'), + maybeUnexpectedEnd: (err) => { + if (err === true) { + return module.exports.UNEXPECTED_END() + } + return err } - return err } diff --git a/src/get-peer-info.js b/src/get-peer-info.js index 671f4fdba3..7e3a13f886 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) { try { return peerBook.get(peerIdB58Str) } catch (err) { - throw new Error('Couldnt get PeerInfo') + throw new Error(`Couldnt get PeerInfo for ${peerIdB58Str}`) } } diff --git a/src/index.js b/src/index.js index 1ec69d6a67..4658b3efe2 100644 --- a/src/index.js +++ b/src/index.js @@ -8,7 +8,7 @@ const series = require('async/series') const TransportManager = require('./transport') const ConnectionManager = require('./connection/manager') const getPeerInfo = require('./get-peer-info') -const dial = require('./dialer') +const getDialer = require('./dialer') const connectionHandler = require('./connection/handler') const ProtocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') @@ -65,10 +65,6 @@ class Switch extends EventEmitter { this.stats = Stats(this.observer, this._options.stats) this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer) - // higher level (public) API - this.dial = dial(this) - this.dialFSM = dial(this, true) - // All purpose connection handler for managing incoming connections this._connectionHandler = connectionHandler(this) @@ -111,6 +107,11 @@ class Switch extends EventEmitter { log.error(err) this.emit('error', err) }) + + // higher level (public) API + const dialer = getDialer(this) + this.dial = dialer.dial + this.dialFSM = dialer.dialFSM } /** @@ -250,7 +251,7 @@ class Switch extends EventEmitter { }, cb) }, cb) }, - (cb) => each([...this.connection.getAll()], (conn, cb) => { + (cb) => each(this.connection.getAll(), (conn, cb) => { conn.once('close', cb) conn.close() }, cb) diff --git a/src/transport.js b/src/transport.js index 4af627688c..b0aac97118 100644 --- a/src/transport.js +++ b/src/transport.js @@ -211,9 +211,10 @@ class TransportManager { return transportAddrs } + const ourAddrs = peerInfo.multiaddrs.toArray() return transportAddrs.filter((addr) => { // If our address is in the destination address, filter it out - return !peerInfo.multiaddrs.toArray().find((pAddr) => { + return !ourAddrs.find((pAddr) => { try { addr.decapsulate(pAddr) } catch (err) { diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 7acf810007..8ab4d8e96b 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -22,6 +22,8 @@ describe('dialFSM', () => { let switchA let switchB let switchC + let peerAId + let peerBId before((done) => createInfos(3, (err, infos) => { expect(err).to.not.exist() @@ -30,6 +32,9 @@ describe('dialFSM', () => { const peerB = infos[1] const peerC = infos[2] + peerAId = peerA.id.toB58String() + peerBId = peerB.id.toB58String() + peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/0') peerB.multiaddrs.add('/ip4/0.0.0.0/tcp/0') peerC.multiaddrs.add('/ip4/0.0.0.0/tcp/0/ws') @@ -57,9 +62,9 @@ describe('dialFSM', () => { switchC.connection.reuse() parallel([ - (cb) => switchA.transport.listen('tcp', {}, null, cb), - (cb) => switchB.transport.listen('tcp', {}, null, cb), - (cb) => switchC.transport.listen('ws', {}, null, cb) + (cb) => switchA.start(cb), + (cb) => switchB.start(cb), + (cb) => switchC.start(cb) ], done) })) @@ -74,72 +79,94 @@ describe('dialFSM', () => { it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => { switchC.handle('/warn/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', () => { }) - - connFSM.once('error:connection_attempt_failed', (errors) => { - expect(errors).to.be.an('array') - expect(errors).to.have.length(1) - done() + switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM.once('error:connection_attempt_failed', (errors) => { + expect(errors).to.be.an('array') + expect(errors).to.have.length(1) + done() + }) }) }) it('should emit an `error` event when a it cannot dial a peer', (done) => { switchC.handle('/error/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', () => { }) - - connFSM.once('error', (err) => { - expect(err).to.be.exist() - expect(err).to.have.property('code', 'CONNECTION_FAILED') - done() + switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM.once('error', (err) => { + expect(err).to.be.exist() + expect(err).to.have.property('code', 'CONNECTION_FAILED') + done() + }) }) }) it('should emit a `closed` event when closed', (done) => { switchB.handle('/closed/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { + switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { expect(err).to.not.exist() - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(1) - connFSM.close() - }) - connFSM.once('close', () => { - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0) - done() + connFSM.once('close', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + done() + }) + + connFSM.once('muxed', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(1) + connFSM.close() + }) }) }) it('should close when the receiver closes', (done) => { - const peerIdA = switchA._peerInfo.id.toB58String() - // wait for the expects to happen expect(2).checks(done) switchB.handle('/closed/1.0.0', () => { }) switchB.on('peer-mux-established', (peerInfo) => { - if (peerInfo.id.toB58String() === peerIdA) { + if (peerInfo.id.toB58String() === peerAId) { switchB.removeAllListeners('peer-mux-established') - expect(switchB.connection.getAllById(peerIdA)).to.have.length(1).mark() - switchB.connection.getOne(peerIdA).close() + expect(switchB.connection.getAllById(peerAId)).to.have.length(1).mark() + switchB.connection.getOne(peerAId).close() } }) - const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { + switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { expect(err).to.not.exist() + + connFSM.once('close', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0).mark() + }) }) - connFSM.once('close', () => { - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0).mark() + }) + + it('parallel dials to the same peer should not create new connections', (done) => { + switchB.handle('/parallel/2.0.0', (_, conn) => { pull(conn, conn) }) + + parallel([ + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/2.0.0', cb), + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/2.0.0', cb) + ], (err, results) => { + expect(err).to.not.exist() + expect(results).to.have.length(2) + expect(switchA.connection.getAllById(peerBId)).to.have.length(1) + + switchA.hangUp(switchB._peerInfo, () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + done() + }) }) }) it('parallel dials to one another should disconnect on hangup', function (done) { - this.timeout(10e3) - switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - // 4 close checks and 1 hangup check + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + + // 4 close checks (1 inc and 1 out for each node) and 1 hangup check expect(5).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') @@ -147,24 +174,28 @@ describe('dialFSM', () => { }) switchA.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerBId).mark() }) switchB.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerAId).mark() }) - const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { - // Hangup and verify the connections are closed - switchA.hangUp(switchB._peerInfo, (err) => { - expect(err).to.not.exist().mark() + switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + connFSM._state.on('DIALING:leave', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => { + expect(err).to.not.exist() + connB.on('muxed', cb) + }) }) - }) - // Hold the dial from A, until switch B is done dialing to ensure - // we have both incoming and outgoing connections - conn._state.on('DIALING:enter', (cb) => { - switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { - cb() + connFSM.on('connection', () => { + // Hangup and verify the connections are closed + switchA.hangUp(switchB._peerInfo, (err) => { + expect(err).to.not.exist().mark() + }) }) }) }) @@ -177,28 +208,52 @@ describe('dialFSM', () => { expect(5).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') - done() + // restart the node for subsequent tests + switchA.start(done) }) switchA.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerBId).mark() }) switchB.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerAId).mark() }) - const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { - // Hangup and verify the connections are closed - switchA.stop((err) => { - expect(err).to.not.exist().mark() + switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + connFSM._state.on('DIALING:leave', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => { + expect(err).to.not.exist() + connB.on('muxed', cb) + }) + }) + + connFSM.on('connection', () => { + // Hangup and verify the connections are closed + switchA.stop((err) => { + expect(err).to.not.exist().mark() + }) }) }) + }) - // Hold the dial from A, until switch B is done dialing to ensure - // we have both incoming and outgoing connections - conn._state.on('DIALING:enter', (cb) => { - switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { - cb() + it('queued dials should be aborted on node stop', (done) => { + switchB.handle('/abort-queue/1.0.0', (_, conn) => { pull(conn, conn) }) + + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM._state.on('UPGRADING:enter', (cb) => { + expect(2).checks(done) + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { + expect(err).to.exist().mark() + }) + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { + expect(err).to.exist().mark() + }) + + switchA.stop(cb) }) }) }) diff --git a/test/identify.node.js b/test/identify.node.js index b999cbca7b..e454d473ef 100644 --- a/test/identify.node.js +++ b/test/identify.node.js @@ -128,11 +128,11 @@ describe('Identify', () => { expect(2).check(done) switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn)) - const connFSM = switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err) => { + switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err, connFSM) => { expect(err).to.not.exist().mark() - }) - connFSM.once('close', () => { - expect(stub.called).to.eql(true).mark() + connFSM.once('close', () => { + expect(stub.called).to.eql(true).mark() + }) }) }) }) diff --git a/test/pnet.node.js b/test/pnet.node.js index a0b31cea5d..e362653acb 100644 --- a/test/pnet.node.js +++ b/test/pnet.node.js @@ -26,8 +26,6 @@ generatePSK(psk) generatePSK(psk2) describe('Private Network', function () { - this.timeout(20 * 1000) - let switchA let switchB let switchC @@ -84,7 +82,6 @@ describe('Private Network', function () { })) after(function (done) { - this.timeout(3 * 1000) parallel([ (cb) => switchA.stop(cb), (cb) => switchB.stop(cb), diff --git a/test/swarm-muxing.node.js b/test/swarm-muxing.node.js index 8a32b9d6fc..4c0fdd3c04 100644 --- a/test/swarm-muxing.node.js +++ b/test/swarm-muxing.node.js @@ -10,7 +10,8 @@ chai.use(dirtyChai) const parallel = require('async/parallel') const TCP = require('libp2p-tcp') const WebSockets = require('libp2p-websockets') -const mplex = require('pull-mplex') +const mplex = require('libp2p-mplex') +const pMplex = require('pull-mplex') const spdy = require('libp2p-spdy') const pull = require('pull-stream') const PeerBook = require('peer-book') @@ -21,7 +22,7 @@ const tryEcho = utils.tryEcho const Switch = require('../src') describe('Switch (everything all together)', () => { - [mplex, spdy].forEach(muxer => { + [pMplex, spdy, mplex].forEach(muxer => { describe(muxer.multicodec, () => { let switchA // tcp let switchB // tcp+ws @@ -48,8 +49,6 @@ describe('Switch (everything all together)', () => { })) after(function (done) { - this.timeout(3 * 1000) - parallel([ (cb) => switchA.stop(cb), (cb) => switchB.stop(cb), @@ -168,15 +167,15 @@ describe('Switch (everything all together)', () => { }) }) - it('dial from tcp to tcp+ws (returned conn)', (done) => { + it('dial from tcp to tcp+ws', (done) => { switchB.handle('/grapes/1.0.0', (protocol, conn) => pull(conn, conn)) - const conn = switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { + switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(switchA.connection.getAll()).to.have.length(1) - }) - tryEcho(conn, done) + tryEcho(conn, done) + }) }) it('dial from tcp+ws to tcp+ws', (done) => {