diff --git a/package.json b/package.json index 5cd7fb3ab7..39f0b88313 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "peer-book": "~0.9.1", "portfinder": "^1.0.20", "pull-length-prefixed": "^1.3.1", - "pull-mplex": "~0.1.0", + "pull-mplex": "~0.1.2", "pull-pair": "^1.1.0", "sinon": "^7.2.3", "webrtcsupport": "^2.2.0" diff --git a/src/connection/base.js b/src/connection/base.js index 087c58200e..cb3d5a59bb 100644 --- a/src/connection/base.js +++ b/src/connection/base.js @@ -92,7 +92,6 @@ class BaseConnection extends EventEmitter { * @returns {void} */ _onPrivatized () { - this.log('successfully privatized incoming connection') this.emit('private', this.conn) } diff --git a/src/connection/handler.js b/src/connection/handler.js index 9443167f2d..abed6126cb 100644 --- a/src/connection/handler.js +++ b/src/connection/handler.js @@ -14,31 +14,32 @@ function listener (_switch) { * @param {function} handler A custom handler to use * @returns {function(Connection)} A connection handler function */ - return (transportKey, handler) => { + return function (transportKey, handler) { /** * Takes a base connection and manages listening behavior * * @param {Connection} conn The connection to manage * @returns {void} */ - return (conn) => { - // Add a transport level observer, if needed - const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn + return function (conn) { + log('received incoming connection for transport %s', transportKey) + conn.getPeerInfo((_, peerInfo) => { + // Add a transport level observer, if needed + const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn + const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo }) - log('received incoming connection') - const connFSM = new IncomingConnection({ connection, _switch, transportKey }) + connFSM.once('error', (err) => log(err)) + connFSM.once('private', (_conn) => { + // Use the custom handler, if it was provided + if (handler) { + return handler(_conn) + } + connFSM.encrypt() + }) + connFSM.once('encrypted', () => connFSM.upgrade()) - connFSM.once('error', (err) => log(err)) - connFSM.once('private', (_conn) => { - // Use the custom handler, if it was provided - if (handler) { - return handler(_conn) - } - connFSM.encrypt() + connFSM.protect() }) - connFSM.once('encrypted', () => connFSM.upgrade()) - - connFSM.protect() } } } diff --git a/src/connection/incoming.js b/src/connection/incoming.js index df9cef967b..f66332a046 100644 --- a/src/connection/incoming.js +++ b/src/connection/incoming.js @@ -7,13 +7,14 @@ const withIs = require('class-is') const BaseConnection = require('./base') class IncomingConnectionFSM extends BaseConnection { - constructor ({ connection, _switch, transportKey }) { + constructor ({ connection, _switch, transportKey, peerInfo }) { super({ _switch, name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}` }) this.conn = connection - this.theirPeerInfo = null + this.theirPeerInfo = peerInfo || null + this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null this.ourPeerInfo = this.switch._peerInfo this.transportKey = transportKey this.protocolMuxer = this.switch.protocolMuxer(this.transportKey) diff --git a/src/connection/index.js b/src/connection/index.js index bbcfef2c64..a7b8b05293 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -5,6 +5,7 @@ const Circuit = require('libp2p-circuit') const multistream = require('multistream-select') const withIs = require('class-is') const BaseConnection = require('./base') +const parallel = require('async/parallel') const observeConnection = require('../observe-connection') const { @@ -33,7 +34,7 @@ const { */ class ConnectionFSM extends BaseConnection { /** - * @param {ConnectionOptions} param0 + * @param {ConnectionOptions} connectionOptions * @constructor */ constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) { @@ -261,7 +262,7 @@ class ConnectionFSM extends BaseConnection { * @returns {void} */ _onDisconnecting () { - this.log('disconnecting from %s', this.theirB58Id) + this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer)) // Issue disconnects on both Peers if (this.theirPeerInfo) { @@ -272,22 +273,31 @@ class ConnectionFSM extends BaseConnection { delete this.switch.conns[this.theirB58Id] + let tasks = [] + // Clean up stored connections if (this.muxer) { - this.muxer.end() - delete this.muxer - this.switch.emit('peer-mux-closed', this.theirPeerInfo) + tasks.push((cb) => { + this.muxer.end(() => { + delete this.muxer + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + cb() + }) + }) } // If we have the base connection, abort it + // Ignore abort errors, since we're closing if (this.conn) { - this.conn.source(true, () => { - this._state('done') - delete this.conn - }) - } else { - this._state('done') + try { + this.conn.source.abort() + } catch (_) { } + delete this.conn } + + parallel(tasks, () => { + this._state('done') + }) } /** @@ -366,8 +376,6 @@ class ConnectionFSM extends BaseConnection { const conn = observeConnection(null, key, _conn, this.switch.observer) this.muxer = this.switch.muxers[key].dialer(conn) - // this.switch.muxedConns[this.theirB58Id] = this - this.switch.connection.add(this) this.muxer.once('close', () => { this.close() diff --git a/src/connection/manager.js b/src/connection/manager.js index 7b5d698eb0..5874d7688b 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -61,8 +61,12 @@ class ConnectionManager { */ getOne (peerId) { if (this.connections[peerId]) { - // TODO: Maybe select the best? - return this.connections[peerId][0] + // Only return muxed connections + for (var i = 0; i < this.connections[peerId].length; i++) { + if (this.connections[peerId][i].getState() === 'MUXED') { + return this.connections[peerId][i] + } + } } return null } diff --git a/src/dialer.js b/src/dialer.js deleted file mode 100644 index 4d324b0a83..0000000000 --- a/src/dialer.js +++ /dev/null @@ -1,110 +0,0 @@ -'use strict' - -const Connection = require('interface-connection').Connection -const ConnectionFSM = require('./connection') -const getPeerInfo = require('./get-peer-info') -const once = require('once') -const nextTick = require('async/nextTick') - -const debug = require('debug') -const log = debug('libp2p:switch:dial') - -function maybePerformHandshake ({ protocol, proxyConnection, connection, callback }) { - if (protocol) { - return connection.shake(protocol, (err, conn) => { - if (!conn) { - return callback(err) - } - - proxyConnection.setPeerInfo(connection.theirPeerInfo) - proxyConnection.setInnerConn(conn) - callback(null, proxyConnection) - }) - } - - nextTick(callback) -} - -/** - * Returns a Dialer generator that when called, will immediately begin dialing - * to the given `peer`. - * - * @param {Switch} _switch - * @param {Boolean} returnFSM Whether or not to return an fsm instead of a Connection - * @returns {function(PeerInfo, string, function(Error, Connection))} - */ -function dial (_switch, returnFSM) { - /** - * Creates a new dialer and immediately begins dialing to the given `peer` - * - * @param {PeerInfo} peer - * @param {string} protocol - * @param {function(Error, Connection)} callback - * @returns {Connection} - */ - return (peer, protocol, callback) => { - if (typeof protocol === 'function') { - callback = protocol - protocol = null - } - - callback = once(callback || function noop () {}) - - const peerInfo = getPeerInfo(peer, _switch._peerBook) - const b58Id = peerInfo.id.toB58String() - - log('dialing to %s with protocol %s', b58Id, protocol || 'unknown') - - let connection = _switch.connection.getOne(b58Id) - - if (!ConnectionFSM.isConnectionFSM(connection)) { - connection = new ConnectionFSM({ - _switch, - peerInfo, - muxer: null, - conn: null - }) - connection.once('error', (err) => callback(err)) - connection.once('connected', () => connection.protect()) - connection.once('private', () => connection.encrypt()) - connection.once('encrypted', () => connection.upgrade()) - connection.once('muxed', () => { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - }) - connection.once('unmuxed', () => { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - }) - } - - const proxyConnection = new Connection() - proxyConnection.setPeerInfo(peerInfo) - - nextTick(() => { - // If we have a muxed connection, attempt the protocol handshake - if (connection.getState() === 'MUXED') { - maybePerformHandshake({ - protocol, - proxyConnection, - connection, - callback - }) - } else { - connection.dial() - } - }) - - return returnFSM ? connection : proxyConnection - } -} - -module.exports = dial diff --git a/src/dialer/index.js b/src/dialer/index.js new file mode 100644 index 0000000000..8b4c8761f9 --- /dev/null +++ b/src/dialer/index.js @@ -0,0 +1,68 @@ +'use strict' + +const DialQueueManager = require('./queueManager') +const getPeerInfo = require('../get-peer-info') + +module.exports = function (_switch) { + const dialQueueManager = new DialQueueManager(_switch) + + _switch.state.on('STOPPING:enter', abort) + + /** + * @param {DialRequest} dialRequest + * @returns {void} + */ + function _dial ({ peerInfo, protocol, useFSM, callback }) { + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } + + try { + peerInfo = getPeerInfo(peerInfo, _switch._peerBook) + } catch (err) { + return callback(err) + } + + // Add it to the queue, it will automatically get executed + dialQueueManager.add({ peerInfo, protocol, useFSM, callback }) + } + + /** + * Aborts all dials that are queued. This should + * only be used when the Switch is being stopped + * + * @param {function} callback + */ + function abort (callback) { + dialQueueManager.abort() + callback() + } + + /** + * Adds the dial request to the queue for the given `peerInfo` + * @param {PeerInfo} peerInfo + * @param {string} protocol + * @param {function(Error, Connection)} callback + */ + function dial (peerInfo, protocol, callback) { + _dial({ peerInfo, protocol, useFSM: false, callback }) + } + + /** + * Behaves like dial, except it calls back with a ConnectionFSM + * + * @param {PeerInfo} peerInfo + * @param {string} protocol + * @param {function(Error, ConnectionFSM)} callback + */ + function dialFSM (peerInfo, protocol, callback) { + _dial({ peerInfo, protocol, useFSM: true, callback }) + } + + return { + dial, + dialFSM, + abort + } +} diff --git a/src/dialer/queue.js b/src/dialer/queue.js new file mode 100644 index 0000000000..b1c6cf1436 --- /dev/null +++ b/src/dialer/queue.js @@ -0,0 +1,219 @@ +'use strict' + +const ConnectionFSM = require('../connection') +const { DIAL_ABORTED } = require('../errors') +const Connection = require('interface-connection').Connection +const nextTick = require('async/nextTick') +const once = require('once') +const debug = require('debug') +const log = debug('libp2p:switch:dial') +log.error = debug('libp2p:switch:dial:error') + +/** + * Components required to execute a dial + * @typedef {Object} DialRequest + * @property {PeerInfo} peerInfo - The peer to dial to + * @property {string} [protocol] - The protocol to create a stream for + * @property {boolean} useFSM - If `callback` should return a ConnectionFSM + * @property {function(Error, Connection|ConnectionFSM)} callback + */ + +/** + * @typedef {Object} NewConnection + * @property {ConnectionFSM} connectionFSM + * @property {boolean} didCreate + */ + +/** + * Attempts to create a new connection or stream (when muxed), + * via negotiation of the given `protocol`. If no `protocol` is + * provided, no action will be taken and `callback` will be called + * immediately with no error or values. + * + * @param {object} options + * @param {string} options.protocol + * @param {ConnectionFSM} options.connection + * @param {function(Error, Connection)} options.callback + * @returns {void} + */ +function createConnectionWithProtocol ({ protocol, connection, callback }) { + if (!protocol) { + return callback() + } + connection.shake(protocol, (err, conn) => { + if (!conn) { + return callback(err) + } + + const proxyConnection = new Connection() + proxyConnection.setPeerInfo(connection.theirPeerInfo) + proxyConnection.setInnerConn(conn) + callback(null, proxyConnection) + }) +} + +/** + * A convenience array wrapper for controlling + * a per peer queue + * + * @returns {Queue} + */ +class Queue { + /** + * @constructor + * @param {PeerInfo} peerInfo + * @param {Switch} _switch + */ + constructor (peerInfo, _switch) { + this.peerInfo = peerInfo + this.id = peerInfo.id.toB58String() + this.switch = _switch + this._queue = [] + this.isRunning = false + } + get length () { + return this._queue.length + } + + /** + * Adds the dial request to the queue and starts the + * queue if it is stopped + * @param {string} protocol + * @param {boolean} useFSM If callback should use a ConnectionFSM instead + * @param {function(Error, Connection)} callback + */ + add (protocol, useFSM, callback) { + this._queue.push({ protocol, useFSM, callback }) + if (!this.isRunning) { + log('starting dial queue to %s', this.id) + this.start() + } + } + + /** + * Starts the queue + */ + start () { + this.isRunning = true + this._run() + } + + /** + * Stops the queue + */ + stop () { + this.isRunning = false + } + + /** + * Stops the queue and errors the callback for each dial request + */ + abort () { + this.stop() + while (this.length > 0) { + let dial = this._queue.shift() + dial.callback(DIAL_ABORTED()) + } + } + + /** + * Attempts to find a muxed connection for the given peer. If one + * isn't found, a new one will be created. + * + * Returns an array containing two items. The ConnectionFSM and wether + * or not the ConnectionFSM was just created. The latter can be used + * to determine dialing needs. + * + * @private + * @param {PeerInfo} peerInfo + * @returns {NewConnection} + */ + _getOrCreateConnection (peerInfo) { + let connectionFSM = this.switch.connection.getOne(this.id) + let didCreate = false + + if (!connectionFSM) { + connectionFSM = new ConnectionFSM({ + _switch: this.switch, + peerInfo, + muxer: null, + conn: null + }) + + // Add control events and start the dialer + connectionFSM.once('connected', () => connectionFSM.protect()) + connectionFSM.once('private', () => connectionFSM.encrypt()) + connectionFSM.once('encrypted', () => connectionFSM.upgrade()) + + didCreate = true + } + + return { connectionFSM, didCreate } + } + + /** + * Executes the next dial in the queue for the given peer + * @private + * @returns {void} + */ + _run () { + // If we have no items in the queue or we're stopped, exit + if (this.length < 1 || !this.isRunning) { + log('stopping the queue for %s', this.id) + return this.stop() + } + + const next = once(() => { + log('starting next dial to %s', this.id) + this._run() + }) + + let queuedDial = this._queue.shift() + let { connectionFSM, didCreate } = this._getOrCreateConnection(this.peerInfo) + + // If the dial expects a ConnectionFSM, we can provide that back now + if (queuedDial.useFSM) { + nextTick(queuedDial.callback, null, connectionFSM) + } + + // If we can handshake protocols, get a new stream and call run again + if (['MUXED', 'CONNECTED'].includes(connectionFSM.getState())) { + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + return + } + + // If we error, error the queued dial + // In the future, it may be desired to error the other queued dials, + // depending on the error. + connectionFSM.once('error', (err) => { + queuedDial.callback(err) + }) + + connectionFSM.once('close', () => { + next() + }) + + // If we're not muxed yet, add listeners + connectionFSM.once('muxed', () => { + this.switch.connection.add(connectionFSM) + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + }) + + connectionFSM.once('unmuxed', () => { + queuedDial.connection = connectionFSM + createConnectionWithProtocol(queuedDial) + next() + }) + + // If we have a new connection, start dialing + if (didCreate) { + connectionFSM.dial() + } + } +} + +module.exports = Queue diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js new file mode 100644 index 0000000000..775dc5287e --- /dev/null +++ b/src/dialer/queueManager.js @@ -0,0 +1,56 @@ +'use strict' + +const once = require('once') +const Queue = require('./queue') + +const noop = () => {} + +class DialQueueManager { + /** + * @constructor + * @param {Switch} _switch + */ + constructor (_switch) { + this._queue = {} + this.switch = _switch + } + + /** + * Iterates over all items in the DialerQueue + * and executes there callback with an error. + * + * This causes the entire DialerQueue to be drained + */ + abort () { + const queues = Object.values(this._queue) + queues.forEach(dialQueue => { + dialQueue.abort() + }) + } + + /** + * Adds the `dialRequest` to the queue and ensures the queue is running + * + * @param {DialRequest} dialRequest + */ + add ({ peerInfo, protocol, useFSM, callback }) { + callback = callback ? once(callback) : noop + + let dialQueue = this.getQueue(peerInfo) + dialQueue.add(protocol, useFSM, callback) + } + + /** + * Returns the `Queue` for the given `peerInfo` + * @param {PeerInfo} peerInfo + * @returns {Queue} + */ + getQueue (peerInfo) { + const id = peerInfo.id.toB58String() + + this._queue[id] = this._queue[id] || new Queue(peerInfo, this.switch) + return this._queue[id] + } +} + +module.exports = DialQueueManager diff --git a/src/errors.js b/src/errors.js index 22833bd238..28beb62ec8 100644 --- a/src/errors.js +++ b/src/errors.js @@ -2,16 +2,18 @@ const errCode = require('err-code') -module.exports.PROTECTOR_REQUIRED = 'No protector provided with private network enforced' -module.exports.CONNECTION_FAILED = (err) => errCode(err, 'CONNECTION_FAILED') -module.exports.DIAL_SELF = () => errCode(new Error('A node cannot dial itself'), 'DIAL_SELF') -module.exports.NO_TRANSPORTS_REGISTERED = () => errCode(new Error('No transports registered, dial not possible'), 'NO_TRANSPORTS_REGISTERED') -module.exports.UNEXPECTED_END = () => errCode(new Error('Unexpected end of input from reader.'), 'UNEXPECTED_END') -module.exports.INVALID_STATE_TRANSITION = (err) => errCode(err, 'INVALID_STATE_TRANSITION') - -module.exports.maybeUnexpectedEnd = (err) => { - if (err === true) { - return module.exports.UNEXPECTED_END() +module.exports = { + CONNECTION_FAILED: (err) => errCode(err, 'CONNECTION_FAILED'), + DIAL_ABORTED: () => errCode('Dial was aborted', 'DIAL_ABORTED'), + DIAL_SELF: () => errCode('A node cannot dial itself', 'DIAL_SELF'), + INVALID_STATE_TRANSITION: (err) => errCode(err, 'INVALID_STATE_TRANSITION'), + NO_TRANSPORTS_REGISTERED: () => errCode('No transports registered, dial not possible', 'NO_TRANSPORTS_REGISTERED'), + PROTECTOR_REQUIRED: () => errCode('No protector provided with private network enforced', 'PROTECTOR_REQUIRED'), + UNEXPECTED_END: () => errCode('Unexpected end of input from reader.', 'UNEXPECTED_END'), + maybeUnexpectedEnd: (err) => { + if (err === true) { + return module.exports.UNEXPECTED_END() + } + return err } - return err } diff --git a/src/get-peer-info.js b/src/get-peer-info.js index 671f4fdba3..7e3a13f886 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) { try { return peerBook.get(peerIdB58Str) } catch (err) { - throw new Error('Couldnt get PeerInfo') + throw new Error(`Couldnt get PeerInfo for ${peerIdB58Str}`) } } diff --git a/src/index.js b/src/index.js index 1ec69d6a67..4658b3efe2 100644 --- a/src/index.js +++ b/src/index.js @@ -8,7 +8,7 @@ const series = require('async/series') const TransportManager = require('./transport') const ConnectionManager = require('./connection/manager') const getPeerInfo = require('./get-peer-info') -const dial = require('./dialer') +const getDialer = require('./dialer') const connectionHandler = require('./connection/handler') const ProtocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') @@ -65,10 +65,6 @@ class Switch extends EventEmitter { this.stats = Stats(this.observer, this._options.stats) this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer) - // higher level (public) API - this.dial = dial(this) - this.dialFSM = dial(this, true) - // All purpose connection handler for managing incoming connections this._connectionHandler = connectionHandler(this) @@ -111,6 +107,11 @@ class Switch extends EventEmitter { log.error(err) this.emit('error', err) }) + + // higher level (public) API + const dialer = getDialer(this) + this.dial = dialer.dial + this.dialFSM = dialer.dialFSM } /** @@ -250,7 +251,7 @@ class Switch extends EventEmitter { }, cb) }, cb) }, - (cb) => each([...this.connection.getAll()], (conn, cb) => { + (cb) => each(this.connection.getAll(), (conn, cb) => { conn.once('close', cb) conn.close() }, cb) diff --git a/src/transport.js b/src/transport.js index 4af627688c..b0aac97118 100644 --- a/src/transport.js +++ b/src/transport.js @@ -211,9 +211,10 @@ class TransportManager { return transportAddrs } + const ourAddrs = peerInfo.multiaddrs.toArray() return transportAddrs.filter((addr) => { // If our address is in the destination address, filter it out - return !peerInfo.multiaddrs.toArray().find((pAddr) => { + return !ourAddrs.find((pAddr) => { try { addr.decapsulate(pAddr) } catch (err) { diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 7acf810007..8ab4d8e96b 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -22,6 +22,8 @@ describe('dialFSM', () => { let switchA let switchB let switchC + let peerAId + let peerBId before((done) => createInfos(3, (err, infos) => { expect(err).to.not.exist() @@ -30,6 +32,9 @@ describe('dialFSM', () => { const peerB = infos[1] const peerC = infos[2] + peerAId = peerA.id.toB58String() + peerBId = peerB.id.toB58String() + peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/0') peerB.multiaddrs.add('/ip4/0.0.0.0/tcp/0') peerC.multiaddrs.add('/ip4/0.0.0.0/tcp/0/ws') @@ -57,9 +62,9 @@ describe('dialFSM', () => { switchC.connection.reuse() parallel([ - (cb) => switchA.transport.listen('tcp', {}, null, cb), - (cb) => switchB.transport.listen('tcp', {}, null, cb), - (cb) => switchC.transport.listen('ws', {}, null, cb) + (cb) => switchA.start(cb), + (cb) => switchB.start(cb), + (cb) => switchC.start(cb) ], done) })) @@ -74,72 +79,94 @@ describe('dialFSM', () => { it('should emit `error:connection_attempt_failed` when a transport fails to dial', (done) => { switchC.handle('/warn/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', () => { }) - - connFSM.once('error:connection_attempt_failed', (errors) => { - expect(errors).to.be.an('array') - expect(errors).to.have.length(1) - done() + switchA.dialFSM(switchC._peerInfo, '/warn/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM.once('error:connection_attempt_failed', (errors) => { + expect(errors).to.be.an('array') + expect(errors).to.have.length(1) + done() + }) }) }) it('should emit an `error` event when a it cannot dial a peer', (done) => { switchC.handle('/error/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', () => { }) - - connFSM.once('error', (err) => { - expect(err).to.be.exist() - expect(err).to.have.property('code', 'CONNECTION_FAILED') - done() + switchA.dialFSM(switchC._peerInfo, '/error/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM.once('error', (err) => { + expect(err).to.be.exist() + expect(err).to.have.property('code', 'CONNECTION_FAILED') + done() + }) }) }) it('should emit a `closed` event when closed', (done) => { switchB.handle('/closed/1.0.0', () => { }) - const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { + switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { expect(err).to.not.exist() - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(1) - connFSM.close() - }) - connFSM.once('close', () => { - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0) - done() + connFSM.once('close', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + done() + }) + + connFSM.once('muxed', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(1) + connFSM.close() + }) }) }) it('should close when the receiver closes', (done) => { - const peerIdA = switchA._peerInfo.id.toB58String() - // wait for the expects to happen expect(2).checks(done) switchB.handle('/closed/1.0.0', () => { }) switchB.on('peer-mux-established', (peerInfo) => { - if (peerInfo.id.toB58String() === peerIdA) { + if (peerInfo.id.toB58String() === peerAId) { switchB.removeAllListeners('peer-mux-established') - expect(switchB.connection.getAllById(peerIdA)).to.have.length(1).mark() - switchB.connection.getOne(peerIdA).close() + expect(switchB.connection.getAllById(peerAId)).to.have.length(1).mark() + switchB.connection.getOne(peerAId).close() } }) - const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { + switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err, connFSM) => { expect(err).to.not.exist() + + connFSM.once('close', () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0).mark() + }) }) - connFSM.once('close', () => { - expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0).mark() + }) + + it('parallel dials to the same peer should not create new connections', (done) => { + switchB.handle('/parallel/2.0.0', (_, conn) => { pull(conn, conn) }) + + parallel([ + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/2.0.0', cb), + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/2.0.0', cb) + ], (err, results) => { + expect(err).to.not.exist() + expect(results).to.have.length(2) + expect(switchA.connection.getAllById(peerBId)).to.have.length(1) + + switchA.hangUp(switchB._peerInfo, () => { + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + done() + }) }) }) it('parallel dials to one another should disconnect on hangup', function (done) { - this.timeout(10e3) - switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - // 4 close checks and 1 hangup check + expect(switchA.connection.getAllById(peerBId)).to.have.length(0) + + // 4 close checks (1 inc and 1 out for each node) and 1 hangup check expect(5).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') @@ -147,24 +174,28 @@ describe('dialFSM', () => { }) switchA.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerBId).mark() }) switchB.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerAId).mark() }) - const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { - // Hangup and verify the connections are closed - switchA.hangUp(switchB._peerInfo, (err) => { - expect(err).to.not.exist().mark() + switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + connFSM._state.on('DIALING:leave', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => { + expect(err).to.not.exist() + connB.on('muxed', cb) + }) }) - }) - // Hold the dial from A, until switch B is done dialing to ensure - // we have both incoming and outgoing connections - conn._state.on('DIALING:enter', (cb) => { - switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { - cb() + connFSM.on('connection', () => { + // Hangup and verify the connections are closed + switchA.hangUp(switchB._peerInfo, (err) => { + expect(err).to.not.exist().mark() + }) }) }) }) @@ -177,28 +208,52 @@ describe('dialFSM', () => { expect(5).checks(() => { switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') - done() + // restart the node for subsequent tests + switchA.start(done) }) switchA.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerBId).mark() }) switchB.on('peer-mux-closed', (peerInfo) => { - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + expect(peerInfo.id.toB58String()).to.eql(peerAId).mark() }) - const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { - // Hangup and verify the connections are closed - switchA.stop((err) => { - expect(err).to.not.exist().mark() + switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + connFSM._state.on('DIALING:leave', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', (err, connB) => { + expect(err).to.not.exist() + connB.on('muxed', cb) + }) + }) + + connFSM.on('connection', () => { + // Hangup and verify the connections are closed + switchA.stop((err) => { + expect(err).to.not.exist().mark() + }) }) }) + }) - // Hold the dial from A, until switch B is done dialing to ensure - // we have both incoming and outgoing connections - conn._state.on('DIALING:enter', (cb) => { - switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { - cb() + it('queued dials should be aborted on node stop', (done) => { + switchB.handle('/abort-queue/1.0.0', (_, conn) => { pull(conn, conn) }) + + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err, connFSM) => { + expect(err).to.not.exist() + connFSM._state.on('UPGRADING:enter', (cb) => { + expect(2).checks(done) + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { + expect(err).to.exist().mark() + }) + switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { + expect(err).to.exist().mark() + }) + + switchA.stop(cb) }) }) }) diff --git a/test/identify.node.js b/test/identify.node.js index b999cbca7b..e454d473ef 100644 --- a/test/identify.node.js +++ b/test/identify.node.js @@ -128,11 +128,11 @@ describe('Identify', () => { expect(2).check(done) switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn)) - const connFSM = switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err) => { + switchB.dialFSM(switchA._peerInfo, '/id-test/1.0.0', (err, connFSM) => { expect(err).to.not.exist().mark() - }) - connFSM.once('close', () => { - expect(stub.called).to.eql(true).mark() + connFSM.once('close', () => { + expect(stub.called).to.eql(true).mark() + }) }) }) }) diff --git a/test/pnet.node.js b/test/pnet.node.js index a0b31cea5d..e362653acb 100644 --- a/test/pnet.node.js +++ b/test/pnet.node.js @@ -26,8 +26,6 @@ generatePSK(psk) generatePSK(psk2) describe('Private Network', function () { - this.timeout(20 * 1000) - let switchA let switchB let switchC @@ -84,7 +82,6 @@ describe('Private Network', function () { })) after(function (done) { - this.timeout(3 * 1000) parallel([ (cb) => switchA.stop(cb), (cb) => switchB.stop(cb), diff --git a/test/swarm-muxing.node.js b/test/swarm-muxing.node.js index 8a32b9d6fc..4c0fdd3c04 100644 --- a/test/swarm-muxing.node.js +++ b/test/swarm-muxing.node.js @@ -10,7 +10,8 @@ chai.use(dirtyChai) const parallel = require('async/parallel') const TCP = require('libp2p-tcp') const WebSockets = require('libp2p-websockets') -const mplex = require('pull-mplex') +const mplex = require('libp2p-mplex') +const pMplex = require('pull-mplex') const spdy = require('libp2p-spdy') const pull = require('pull-stream') const PeerBook = require('peer-book') @@ -21,7 +22,7 @@ const tryEcho = utils.tryEcho const Switch = require('../src') describe('Switch (everything all together)', () => { - [mplex, spdy].forEach(muxer => { + [pMplex, spdy, mplex].forEach(muxer => { describe(muxer.multicodec, () => { let switchA // tcp let switchB // tcp+ws @@ -48,8 +49,6 @@ describe('Switch (everything all together)', () => { })) after(function (done) { - this.timeout(3 * 1000) - parallel([ (cb) => switchA.stop(cb), (cb) => switchB.stop(cb), @@ -168,15 +167,15 @@ describe('Switch (everything all together)', () => { }) }) - it('dial from tcp to tcp+ws (returned conn)', (done) => { + it('dial from tcp to tcp+ws', (done) => { switchB.handle('/grapes/1.0.0', (protocol, conn) => pull(conn, conn)) - const conn = switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { + switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(switchA.connection.getAll()).to.have.length(1) - }) - tryEcho(conn, done) + tryEcho(conn, done) + }) }) it('dial from tcp+ws to tcp+ws', (done) => {