From 4c6af135cb3077b095fe8269115f5a8863842ee3 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 24 Apr 2018 00:24:10 +0200 Subject: [PATCH] refactor: update files and add jsdocs to improve readability refactor: initial refactor of dial.js refactor: add more jsdocs to dial and clean up some code refactor: make get-peer-info more readable fix: jsdocs in dial docs: update some jsdocs refactor: make dial.js a bit easier to consume fix: fix linting --- src/connection.js | 236 ++++++++++-------- src/dial.js | 477 ++++++++++++++++++++++++------------- src/get-peer-info.js | 30 +-- src/index.js | 53 +++-- src/observe-connection.js | 29 ++- src/observer.js | 15 +- src/transport.js | 216 ++++++++++------- test/circuit-relay.node.js | 4 +- 8 files changed, 662 insertions(+), 398 deletions(-) diff --git a/src/connection.js b/src/connection.js index bbedeaf..1447c4d 100644 --- a/src/connection.js +++ b/src/connection.js @@ -12,121 +12,155 @@ const Circuit = require('libp2p-circuit') const plaintext = require('./plaintext') -module.exports = function connection (swtch) { - return { - addUpgrade () {}, - - addStreamMuxer (muxer) { - // for dialing - swtch.muxers[muxer.multicodec] = muxer - - // for listening - swtch.handle(muxer.multicodec, (protocol, conn) => { - const muxedConn = muxer.listener(conn) - - muxedConn.on('stream', swtch.protocolMuxer(null)) - - // If identify is enabled - // 1. overload getPeerInfo - // 2. call getPeerInfo - // 3. add this conn to the pool - if (swtch.identify) { - // overload peerInfo to use Identify instead - conn.getPeerInfo = (cb) => { - const conn = muxedConn.newStream() - const ms = new multistream.Dialer() - cb = once(cb) - - waterfall([ - (cb) => ms.handle(conn, cb), - (cb) => ms.select(identify.multicodec, cb), - (conn, cb) => identify.dialer(conn, cb), - (peerInfo, observedAddrs, cb) => { - observedAddrs.forEach((oa) => { - swtch._peerInfo.multiaddrs.addSafe(oa) - }) - cb(null, peerInfo) - } - ], (err, pi) => { - if (pi) { - conn.setPeerInfo(pi) - } - cb(err, pi) - }) - } +/** + * Contains methods for binding handlers to the Switch + * in order to better manage its connections. + */ +class ConnectionManager { + constructor (_switch) { + this.switch = _switch + } - conn.getPeerInfo((err, peerInfo) => { - if (err) { - return log('Identify not successful') + /** + * Adds a listener for the given `muxer` and creates a handler for it + * leveraging the Switch.protocolMuxer handler factory + * + * @param {Muxer} muxer + * @returns {void} + */ + addStreamMuxer (muxer) { + // for dialing + this.switch.muxers[muxer.multicodec] = muxer + + // for listening + this.switch.handle(muxer.multicodec, (protocol, conn) => { + const muxedConn = muxer.listener(conn) + + muxedConn.on('stream', this.switch.protocolMuxer(null)) + + // If identify is enabled + // 1. overload getPeerInfo + // 2. call getPeerInfo + // 3. add this conn to the pool + if (this.switch.identify) { + // overload peerInfo to use Identify instead + conn.getPeerInfo = (callback) => { + const conn = muxedConn.newStream() + const ms = new multistream.Dialer() + callback = once(callback) + + waterfall([ + (cb) => ms.handle(conn, cb), + (cb) => ms.select(identify.multicodec, cb), + (conn, cb) => identify.dialer(conn, cb), + (peerInfo, observedAddrs, cb) => { + observedAddrs.forEach((oa) => { + this.switch._peerInfo.multiaddrs.addSafe(oa) + }) + cb(null, peerInfo) } - const b58Str = peerInfo.id.toB58String() - - swtch.muxedConns[b58Str] = { muxer: muxedConn } - - if (peerInfo.multiaddrs.size > 0) { - // with incomming conn and through identify, going to pick one - // of the available multiaddrs from the other peer as the one - // I'm connected to as we really can't be sure at the moment - // TODO add this consideration to the connection abstraction! - peerInfo.connect(peerInfo.multiaddrs.toArray()[0]) - } else { - // for the case of websockets in the browser, where peers have - // no addr, use just their IPFS id - peerInfo.connect(`/ipfs/${b58Str}`) + ], (err, peerInfo) => { + if (peerInfo) { + conn.setPeerInfo(peerInfo) } - peerInfo = swtch._peerBook.put(peerInfo) + callback(err, peerInfo) + }) + } - muxedConn.on('close', () => { - delete swtch.muxedConns[b58Str] - peerInfo.disconnect() - peerInfo = swtch._peerBook.put(peerInfo) - setImmediate(() => swtch.emit('peer-mux-closed', peerInfo)) - }) + conn.getPeerInfo((err, peerInfo) => { + if (err) { + return log('Identify not successful') + } + const b58Str = peerInfo.id.toB58String() + + this.switch.muxedConns[b58Str] = { muxer: muxedConn } + + if (peerInfo.multiaddrs.size > 0) { + // with incomming conn and through identify, going to pick one + // of the available multiaddrs from the other peer as the one + // I'm connected to as we really can't be sure at the moment + // TODO add this consideration to the connection abstraction! + peerInfo.connect(peerInfo.multiaddrs.toArray()[0]) + } else { + // for the case of websockets in the browser, where peers have + // no addr, use just their IPFS id + peerInfo.connect(`/ipfs/${b58Str}`) + } + peerInfo = this.switch._peerBook.put(peerInfo) - setImmediate(() => swtch.emit('peer-mux-established', peerInfo)) + muxedConn.on('close', () => { + delete this.switch.muxedConns[b58Str] + peerInfo.disconnect() + peerInfo = this.switch._peerBook.put(peerInfo) + setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo)) }) - } - return conn - }) - }, + setImmediate(() => this.switch.emit('peer-mux-established', peerInfo)) + }) + } - reuse () { - swtch.identify = true - swtch.handle(identify.multicodec, (protocol, conn) => { - identify.listener(conn, swtch._peerInfo) - }) - }, + return conn + }) + } - enableCircuitRelay (config) { - config = config || {} + /** + * Adds the `encrypt` handler for the given `tag` and also sets the + * Switch's crypto to past `encrypt` function + * + * @param {String} tag + * @param {function(PeerID, Connection, PeerId, Callback)} encrypt + * @returns {void} + */ + crypto (tag, encrypt) { + if (!tag && !encrypt) { + tag = plaintext.tag + encrypt = plaintext.encrypt + } - if (config.enabled) { - if (!config.hop) { - Object.assign(config, { hop: { enabled: false, active: false } }) - } + this.switch.unhandle(this.switch.crypto.tag) + this.switch.handle(tag, (protocol, conn) => { + const myId = this.switch._peerInfo.id + const secure = encrypt(myId, conn, undefined, () => { + this.switch.protocolMuxer(null)(secure) + }) + }) - // TODO: (dryajov) should we enable circuit listener and - // dialer by default? - swtch.transport.add(Circuit.tag, new Circuit(swtch, config)) - } - }, + this.switch.crypto = {tag, encrypt} + } - crypto (tag, encrypt) { - if (!tag && !encrypt) { - tag = plaintext.tag - encrypt = plaintext.encrypt + /** + * If config.enabled is true, a Circuit relay will be added to the + * available Switch transports. + * + * @param {any} config + * @returns {void} + */ + enableCircuitRelay (config) { + config = config || {} + + if (config.enabled) { + if (!config.hop) { + Object.assign(config, { hop: { enabled: false, active: false } }) } - swtch.unhandle(swtch.crypto.tag) - swtch.handle(tag, (protocol, conn) => { - const myId = swtch._peerInfo.id - const secure = encrypt(myId, conn, undefined, () => { - swtch.protocolMuxer(null)(secure) - }) - }) - - swtch.crypto = {tag, encrypt} + // TODO: (dryajov) should we enable circuit listener and + // dialer by default? + this.switch.transport.add(Circuit.tag, new Circuit(this.switch, config)) } } + + /** + * Sets identify to true on the Switch and performs handshakes + * for libp2p-identify leveraging the Switch's muxer. + * + * @returns {void} + */ + reuse () { + this.switch.identify = true + this.switch.handle(identify.multicodec, (protocol, conn) => { + identify.listener(conn, this.switch._peerInfo) + }) + } } + +module.exports = ConnectionManager diff --git a/src/dial.js b/src/dial.js index 07c050a..49cee11 100644 --- a/src/dial.js +++ b/src/dial.js @@ -4,6 +4,7 @@ const multistream = require('multistream-select') const Connection = require('interface-connection').Connection const setImmediate = require('async/setImmediate') const Circuit = require('libp2p-circuit') +const waterfall = require('async/waterfall') const debug = require('debug') const log = debug('libp2p:switch:dial') @@ -11,222 +12,368 @@ const log = debug('libp2p:switch:dial') const getPeerInfo = require('./get-peer-info') const observeConnection = require('./observe-connection') -function dial (swtch) { - return (peer, protocol, callback) => { - if (typeof protocol === 'function') { - callback = protocol - protocol = null - } - - callback = callback || function noop () {} - const pi = getPeerInfo(peer, swtch._peerBook) - - const proxyConn = new Connection() - proxyConn.setPeerInfo(pi) - - const b58Id = pi.id.toB58String() - log('dialing %s', b58Id) +/** + * Manages dialing to another peer, including muxer upgrades + * and crypto management. The main entry point for dialing is + * Dialer.dial + */ +class Dialer { + /** + * Constructor + * + * @param {Switch} _switch + * @param {PeerInfo} peerInfo + * @param {string} protocol + * @param {function(Error, Connection)} callback + */ + constructor (_switch, peerInfo, protocol, callback) { + this.switch = _switch + this.peerInfo = peerInfo + this.protocol = protocol + this.callback = callback + } - if (!swtch.muxedConns[b58Id]) { - if (!swtch.conns[b58Id]) { - attemptDial(pi, (err, conn) => { - if (err) { - return callback(err) - } - gotWarmedUpConn(conn) - }) - } else { - const conn = swtch.conns[b58Id] - swtch.conns[b58Id] = undefined - gotWarmedUpConn(conn) - } - } else { - if (!protocol) { - return callback() + /** + * Initializes a proxy connection and returns it. The connection is also immediately + * dialed. This will include establishing the base connection, crypto, muxing and the + * protocol handshake if all needed components have already been set. + * + * @returns {Connection} + */ + dial () { + const proxyConnection = new Connection() + proxyConnection.setPeerInfo(this.peerInfo) + + waterfall([ + (cb) => { + this.establishConnection(cb) + }, + (connection, cb) => { + if (connection) { + proxyConnection.setPeerInfo(this.peerInfo) + proxyConnection.setInnerConn(connection) + return cb(null, proxyConnection) + } + cb(null) } - gotMuxer(swtch.muxedConns[b58Id].muxer) - } + ], (err, connection) => { + this.callback(err, connection) + }) - return proxyConn + return proxyConnection + } - function gotWarmedUpConn (conn) { - conn.setPeerInfo(pi) + /** + * Establishes a base connection and then continues to upgrade that connection + * including: crypto, muxing and the protocol handshake. If any upgrade is not + * yet available, or already exists, the upgrade will continue where it left off. + * + * @param {function(Error, Connection)} callback + * @returns {void} + */ + establishConnection (callback) { + const b58Id = this.peerInfo.id.toB58String() + log('dialing %s', b58Id) - attemptMuxerUpgrade(conn, (err, muxer) => { - if (!protocol) { - if (err) { - swtch.conns[b58Id] = conn - } - return callback() + waterfall([ + (cb) => { + // Start with a base connection + this.createBaseConnection(b58Id, cb) + }, + (baseConnection, cb) => { + // Add the Switch's crypt encryption to the connection + this.encryptConnection(baseConnection, cb) + }, + (encryptedConnection, cb) => { + // Upgrade the connection with a muxer + this.createMuxedConnection(encryptedConnection, b58Id, cb) + }, + (muxer, cb) => { + // If we have no protocol, dont continue with the handshake + if (!this.protocol) { + return cb() } + // If we have a muxer, create a new stream, otherwise it's a standard connection + const connection = muxer.newStream ? muxer.newStream() : muxer + this.performProtocolHandshake(connection, cb) + } + ], (err, connection) => { + callback(err, connection) + }) + } - if (err) { - // couldn't upgrade to Muxer, it is ok - protocolHandshake(conn, protocol, callback) - } else { - gotMuxer(muxer) - } - }) + /** + * If the base connection already exists to the PeerId key, `b58Id`, + * it will be returned in the callback. If no connection exists, one will + * be attempted via Dialer.attemptDial. + * + * @param {string} b58Id + * @param {function(Error, Connection)} callback + * @returns {void} + */ + createBaseConnection (b58Id, callback) { + const baseConnection = this.switch.conns[b58Id] + if (baseConnection) { + this.switch.conns[b58Id] = undefined + return callback(null, baseConnection) } - function gotMuxer (muxer) { - if (swtch.identify) { - // TODO: Consider: - // 1. overload getPeerInfo - // 2. exec identify (through getPeerInfo) - // 3. update the peerInfo that is already stored in the conn + waterfall([ + (cb) => { + this.attemptDial(cb) + } + ], (err, connection) => { + if (err) { + return callback(err) } - openConnInMuxedConn(muxer, (conn) => { - protocolHandshake(conn, protocol, callback) - }) + callback(null, connection) + }) + } + + /** + * If the given PeerId key, `b58Id`, has an existing muxed connection + * it will be returned via the callback, otherwise the connection + * upgrade will be initiated via Dialer.attemptMuxerUpgrade. + * + * @param {Connection} connection + * @param {string} b58Id + * @param {function(Error, Connection)} callback + * @returns {void} + */ + createMuxedConnection (connection, b58Id, callback) { + const muxedConnection = this.switch.muxedConns[b58Id] + if (muxedConnection) { + return callback(null, muxedConnection.muxer) } - function attemptDial (pi, cb) { - if (!swtch.hasTransports()) { - return cb(new Error('No transports registered, dial not possible')) + connection.setPeerInfo(this.peerInfo) + + waterfall([ + (cb) => { + this.attemptMuxerUpgrade(connection, b58Id, cb) + } + ], (err, muxer) => { + if (err && !this.protocol) { + this.switch.conns[b58Id] = connection + return callback(null, null) } - const tKeys = swtch.availableTransports(pi) + if (err) { + // couldn't upgrade to Muxer, it is ok, use the existing connection + return callback(null, connection) + } - let circuitTried = false - nextTransport(tKeys.shift()) + callback(null, muxer) + }) + } - function nextTransport (key) { - let transport = key - if (!transport) { - if (circuitTried) { - return cb(new Error(`Circuit already tried!`)) - } + /** + * Iterates over each Muxer on the Switch and attempts to upgrade + * the given `connection`. Successful muxed connections will be stored + * on the Switch.muxedConns with `b58Id` as their key for future reference. + * + * @param {Connection} connection + * @param {string} b58Id + * @param {function(Error, Connection)} callback + * @returns {void} + */ + attemptMuxerUpgrade (connection, b58Id, callback) { + const muxers = Object.keys(this.switch.muxers) + if (muxers.length === 0) { + return callback(new Error('no muxers available')) + } - if (!swtch.transports[Circuit.tag]) { - return cb(new Error(`Circuit not enabled!`)) + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + const nextMuxer = (key) => { + log('selecting %s', key) + msDialer.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + return callback(new Error('could not upgrade to stream muxing')) } - log(`Falling back to dialing over circuit`) - pi.multiaddrs.add(`/p2p-circuit/ipfs/${pi.id.toB58String()}`) - circuitTried = true - transport = Circuit.tag + nextMuxer(muxers.shift()) } - log(`dialing transport ${transport}`) - swtch.transport.dial(transport, pi, (err, _conn) => { - if (err) { - log(err) - return nextTransport(tKeys.shift()) - } + const muxedConn = this.switch.muxers[key].dialer(conn) + this.switch.muxedConns[b58Id] = {} + this.switch.muxedConns[b58Id].muxer = muxedConn - const conn = observeConnection(transport, null, _conn, swtch.observer) + muxedConn.once('close', () => { + delete this.switch.muxedConns[b58Id] + this.peerInfo.disconnect() + this.switch._peerInfo.disconnect() + setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) + }) - cryptoDial() + // For incoming streams, in case identify is on + muxedConn.on('stream', (conn) => { + conn.setPeerInfo(this.peerInfo) + this.switch.protocolMuxer(null)(conn) + }) - function cryptoDial () { - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { - if (err) { - return cb(err) - } + setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) - const myId = swtch._peerInfo.id - log('selecting crypto: %s', swtch.crypto.tag) - ms.select(swtch.crypto.tag, (err, _conn) => { - if (err) { return cb(err) } + callback(null, muxedConn) + }) + } - const conn = observeConnection(null, swtch.crypto.tag, _conn, swtch.observer) + const msDialer = new multistream.Dialer() + msDialer.handle(connection, (err) => { + if (err) { + return callback(new Error('multistream not supported')) + } - const wrapped = swtch.crypto.encrypt(myId, conn, pi.id, (err) => { - if (err) { - return cb(err) - } + nextMuxer(muxers.shift()) + }) + } - wrapped.setPeerInfo(pi) - cb(null, wrapped) - }) - }) - }) - } - }) - } + /** + * Iterates over each Transport on the Switch and attempts to connect + * to the peer. Once a Transport succeeds, no additional Transports will + * be dialed. + * + * @param {function(Error, Connection)} callback + * @returns {void} + */ + attemptDial (callback) { + if (!this.switch.hasTransports()) { + return callback(new Error('No transports registered, dial not possible')) } - function attemptMuxerUpgrade (conn, cb) { - const muxers = Object.keys(swtch.muxers) - if (muxers.length === 0) { - return cb(new Error('no muxers available')) - } + const tKeys = this.switch.availableTransports(this.peerInfo) + + let circuitTried = false + + const nextTransport = (key) => { + let transport = key + if (!transport) { + if (circuitTried) { + return callback(new Error(`Circuit already tried!`)) + } + + if (!this.switch.transports[Circuit.tag]) { + return callback(new Error(`Circuit not enabled!`)) + } - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler + log(`Falling back to dialing over circuit`) + this.peerInfo.multiaddrs.add(`/p2p-circuit/ipfs/${this.peerInfo.id.toB58String()}`) + circuitTried = true + transport = Circuit.tag + } - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { + log(`dialing transport ${transport}`) + this.switch.transport.dial(transport, this.peerInfo, (err, _conn) => { if (err) { - return cb(new Error('multistream not supported')) + log(err) + return nextTransport(tKeys.shift()) } - nextMuxer(muxers.shift()) + const conn = observeConnection(transport, null, _conn, this.switch.observer) + callback(null, conn) }) + } - function nextMuxer (key) { - log('selecting %s', key) - ms.select(key, (err, conn) => { - if (err) { - if (muxers.length === 0) { - cb(new Error('could not upgrade to stream muxing')) - } else { - nextMuxer(muxers.shift()) - } - return - } + nextTransport(tKeys.shift()) + } - const muxedConn = swtch.muxers[key].dialer(conn) - swtch.muxedConns[b58Id] = {} - swtch.muxedConns[b58Id].muxer = muxedConn - // should not be needed anymore - swtch.muxedConns[b58Id].conn = conn + /** + * Attempts to encrypt the given `connection` with the Switch's crypto. + * + * @param {Connection} connection + * @param {function(Error, Connection)} callback + * @returns {void} + */ + encryptConnection (connection, callback) { + const msDialer = new multistream.Dialer() + + msDialer.handle(connection, (err) => { + if (err) { + return callback(err) + } - muxedConn.once('close', () => { - const b58Str = pi.id.toB58String() - delete swtch.muxedConns[b58Str] - pi.disconnect() - swtch._peerBook.get(b58Str).disconnect() - setImmediate(() => swtch.emit('peer-mux-closed', pi)) - }) + const myId = this.switch._peerInfo.id + log('selecting crypto: %s', this.switch.crypto.tag) - // For incoming streams, in case identify is on - muxedConn.on('stream', (conn) => { - conn.setPeerInfo(pi) - swtch.protocolMuxer(null)(conn) - }) + msDialer.select(this.switch.crypto.tag, (err, _conn) => { + if (err) { + return callback(err) + } - setImmediate(() => swtch.emit('peer-mux-established', pi)) + const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer) - cb(null, muxedConn) + const encryptedConnection = this.switch.crypto.encrypt(myId, conn, this.peerInfo.id, (err) => { + if (err) { + return callback(err) + } + + encryptedConnection.setPeerInfo(this.peerInfo) + callback(null, encryptedConnection) }) - } - } + }) + }) + } - function openConnInMuxedConn (muxer, cb) { - cb(muxer.newStream()) + /** + * Initiates a handshake for the Dialer's set protocol + * + * @param {Connection} connection + * @param {function(Error, Connection)} callback + * @returns {void} + */ + performProtocolHandshake (connection, callback) { + // If there is no protocol set yet, don't perform the handshake + if (!this.protocol) { + callback() } - function protocolHandshake (conn, protocol, cb) { - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { + const msDialer = new multistream.Dialer() + msDialer.handle(connection, (err) => { + if (err) { + return callback(err) + } + msDialer.select(this.protocol, (err, conn) => { if (err) { - return cb(err) + return callback(err) } - ms.select(protocol, (err, conn) => { - if (err) { - return cb(err) - } - proxyConn.setPeerInfo(pi) - proxyConn.setInnerConn(conn) - cb(null, proxyConn) - }) + callback(null, conn) }) + }) + } +} + +/** + * Returns a Dialer generator that when called, will immediately begin dialing + * fo the given `peer`. + * + * @param {Switch} _switch + * @returns {function(PeerInfo, string, function(Error, Connection))} + */ +function dial (_switch) { + /** + * Creates a new dialer and immediately begins dialing to the given `peer` + * + * @param {PeerInfo} peer + * @param {string} protocol + * @param {function(Error, Connection)} callback + * @returns {Connection} + */ + return (peer, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = null } + + callback = callback || function noop () {} + + const peerInfo = getPeerInfo(peer, _switch._peerBook) + const dialer = new Dialer(_switch, peerInfo, protocol, callback) + + return dialer.dial() } } diff --git a/src/get-peer-info.js b/src/get-peer-info.js index b254208..4d7786a 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -8,34 +8,36 @@ const multiaddr = require('multiaddr') * Helper method to check the data type of peer and convert it to PeerInfo */ function getPeerInfo (peer, peerBook) { - let p + let peerInfo - // PeerInfo + // Already a PeerInfo instance if (PeerInfo.isPeerInfo(peer)) { - p = peer - // Multiaddr instance (not string) - } else if (multiaddr.isMultiaddr(peer)) { + return peer + } + + // Attempt to convert from Multiaddr instance (not string) + if (multiaddr.isMultiaddr(peer)) { const peerIdB58Str = peer.getPeerId() try { - p = peerBook.get(peerIdB58Str) + peerInfo = peerBook.get(peerIdB58Str) } catch (err) { - p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str)) + peerInfo = new PeerInfo(PeerId.createFromB58String(peerIdB58Str)) } - p.multiaddrs.add(peer) + peerInfo.multiaddrs.add(peer) + return peerInfo + } - // PeerId - } else if (PeerId.isPeerId(peer)) { + // Attempt to convert from PeerId + if (PeerId.isPeerId(peer)) { const peerIdB58Str = peer.toB58String() try { - p = peerBook.get(peerIdB58Str) + return peerBook.get(peerIdB58Str) } catch (err) { throw new Error('Couldnt get PeerInfo') } - } else { - throw new Error('peer type not recognized') } - return p + throw new Error('peer type not recognized') } module.exports = getPeerInfo diff --git a/src/index.js b/src/index.js index d3ca855..e0188d3 100644 --- a/src/index.js +++ b/src/index.js @@ -3,8 +3,8 @@ const EE = require('events').EventEmitter const each = require('async/each') const series = require('async/series') -const transport = require('./transport') -const connection = require('./connection') +const TransportManager = require('./transport') +const ConnectionManager = require('./connection') const getPeerInfo = require('./get-peer-info') const dial = require('./dial') const ProtocolMuxer = require('./protocol-muxer') @@ -52,25 +52,8 @@ class Switch extends EE { // Crypto details this.crypto = plaintext - this.transport = transport(this) - this.connection = connection(this) - - this.hasTransports = () => { - const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit') - return transports && transports.length > 0 - } - - this.availableTransports = (pi) => { - const myAddrs = pi.multiaddrs.toArray() - const myTransports = Object.keys(this.transports) - - // Only listen on transports we actually have addresses for - return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0) - // push Circuit to be the last proto to be dialed - .sort((a) => { - return a === 'Circuit' ? 1 : 0 - }) - } + this.transport = new TransportManager(this) + this.connection = new ConnectionManager(this) this.observer = Observer(this) this.stats = Stats(this.observer, this._options.stats) @@ -86,6 +69,24 @@ class Switch extends EE { this.dial = dial(this) } + /** + * Returns a list of the transports peerInfo has addresses for + * + * @param {PeerInfo} peerInfo + * @returns {Array} + */ + availableTransports (peerInfo) { + const myAddrs = peerInfo.multiaddrs.toArray() + const myTransports = Object.keys(this.transports) + + // Only listen on transports we actually have addresses for + return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0) + // push Circuit to be the last proto to be dialed + .sort((a) => { + return a === 'Circuit' ? 1 : 0 + }) + } + // Start listening on all available transports start (callback) { each(this.availableTransports(this._peerInfo), (ts, cb) => { @@ -143,6 +144,16 @@ class Switch extends EE { callback() } } + + /** + * Returns whether or not the switch has any transports + * + * @returns {boolean} + */ + hasTransports () { + const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit') + return transports && transports.length > 0 + } } module.exports = Switch diff --git a/src/observe-connection.js b/src/observe-connection.js index 66896b6..ead6a23 100644 --- a/src/observe-connection.js +++ b/src/observe-connection.js @@ -3,17 +3,29 @@ const Connection = require('interface-connection').Connection const pull = require('pull-stream') -module.exports = (transport, protocol, _conn, observer) => { +/** + * Creates a pull stream to run the given Connection stream through + * the given Observer. This provides a way to more easily monitor connections + * and their metadata. A new Connection will be returned that contains + * has the attached Observer. + * + * @param {Transport} transport + * @param {string} protocol + * @param {Connection} connection + * @param {Observer} observer + * @returns {Connection} + */ +module.exports = (transport, protocol, connection, observer) => { const peerInfo = new Promise((resolve, reject) => { - _conn.getPeerInfo((err, peerInfo) => { + connection.getPeerInfo((err, peerInfo) => { if (!err && peerInfo) { resolve(peerInfo) return } - const setPeerInfo = _conn.setPeerInfo - _conn.setPeerInfo = (pi) => { - setPeerInfo.call(_conn, pi) + const setPeerInfo = connection.setPeerInfo + connection.setPeerInfo = (pi) => { + setPeerInfo.call(connection, pi) resolve(pi) } }) @@ -21,11 +33,12 @@ module.exports = (transport, protocol, _conn, observer) => { const stream = { source: pull( - _conn, + connection, observer.incoming(transport, protocol, peerInfo)), sink: pull( observer.outgoing(transport, protocol, peerInfo), - _conn) + connection) } - return new Connection(stream, _conn) + + return new Connection(stream, connection) } diff --git a/src/observer.js b/src/observer.js index 6d8b756..0504caa 100644 --- a/src/observer.js +++ b/src/observer.js @@ -3,6 +3,15 @@ const pull = require('pull-stream') const EventEmitter = require('events') +/** + * Takes a Switch and returns an Observer that can be used in conjunction with + * observe-connection.js. The returned Observer comes with `incoming` and + * `outgoing` properties that can be used in pull streams to emit all metadata + * for messages that pass through a Connection. + * + * @param {Switch} swtch + * @returns {EventEmitter} + */ module.exports = (swtch) => { const observer = Object.assign(new EventEmitter(), { incoming: observe('in'), @@ -29,9 +38,9 @@ module.exports = (swtch) => { } function willObserve (peerInfo, transport, protocol, direction, bufferLength) { - peerInfo.then((pi) => { - if (pi) { - const peerId = pi.id.toB58String() + peerInfo.then((_peerInfo) => { + if (_peerInfo) { + const peerId = _peerInfo.id.toB58String() setImmediate(() => observer.emit('message', peerId, transport, protocol, direction, bufferLength)) } }) diff --git a/src/transport.js b/src/transport.js index 7df462e..c9b7882 100644 --- a/src/transport.js +++ b/src/transport.js @@ -14,112 +14,160 @@ const defaultPerPeerRateLimit = 8 // TODO this should be exposed as a option const dialTimeout = 30 * 1000 -module.exports = function (swtch) { - const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) +/** + * Manages the transports for the switch. This simplifies dialing and listening across + * multiple transports. + */ +class TransportManager { + constructor (_switch) { + this.switch = _switch + this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) + } - return { - add (key, transport, options) { - options = options || {} + /** + * Adds a `Transport` to the list of transports on the switch, and assigns it to the given key + * + * @param {String} key + * @param {Transport} transport + * @returns {void} + */ + add (key, transport) { + log('adding %s', key) + if (this.switch.transports[key]) { + throw new Error('There is already a transport with this key') + } - log('adding %s', key) - if (swtch.transports[key]) { - throw new Error('There is already a transport with this key') - } + this.switch.transports[key] = transport + if (!this.switch.transports[key].listeners) { + this.switch.transports[key].listeners = [] + } + } - swtch.transports[key] = transport - if (!swtch.transports[key].listeners) { - swtch.transports[key].listeners = [] - } - }, + /** + * For a given transport `key`, dial to all that transport multiaddrs + * + * @param {String} key Key of the `Transport` to dial + * @param {PeerInfo} peerInfo + * @param {function(Error, Connection)} callback + * @returns {void} + */ + dial (key, peerInfo, callback) { + const transport = this.switch.transports[key] + let multiaddrs = peerInfo.multiaddrs.toArray() + + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } - dial (key, pi, callback) { - const t = swtch.transports[key] - let multiaddrs = pi.multiaddrs.toArray() + // filter the multiaddrs that are actually valid for this transport + multiaddrs = TransportManager.dialables(transport, multiaddrs) + log('dialing %s', key, multiaddrs.map((m) => m.toString())) - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } - // filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) - multiaddrs = dialables(t, multiaddrs) - log('dialing %s', key, multiaddrs.map((m) => m.toString())) - - dialer.dialMany(pi.id, t, multiaddrs, (err, success) => { - if (err) { - return callback(err) - } - - pi.connect(success.multiaddr) - swtch._peerBook.put(pi) - callback(null, success.conn) - }) - }, - - listen (key, options, handler, callback) { - // if no handler is passed, we pass conns to protocolMuxer - if (!handler) { - handler = swtch.protocolMuxer(key) + // dial each of the multiaddrs with the given transport + this.dialer.dialMany(peerInfo.id, transport, multiaddrs, (err, success) => { + if (err) { + return callback(err) } - const multiaddrs = dialables(swtch.transports[key], swtch._peerInfo.multiaddrs.distinct()) + peerInfo.connect(success.multiaddr) + this.switch._peerBook.put(peerInfo) + callback(null, success.conn) + }) + } + + /** + * For a given Transport `key`, listen on all multiaddrs in the switch's `_peerInfo`. + * If a `handler` is not provided, the Switch's `protocolMuxer` will be used. + * + * @param {String} key + * @param {*} options + * @param {function(Connection)} handler + * @param {function(Error)} callback + * @returns {void} + */ + listen (key, options, handler, callback) { + // if no handler is passed, we pass conns to protocolMuxer + if (!handler) { + handler = this.switch.protocolMuxer(key) + } - const transport = swtch.transports[key] + const transport = this.switch.transports[key] + const multiaddrs = TransportManager.dialables( + transport, + this.switch._peerInfo.multiaddrs.distinct() + ) - if (!transport.listeners) { - transport.listeners = [] - } + if (!transport.listeners) { + transport.listeners = [] + } - let freshMultiaddrs = [] + let freshMultiaddrs = [] - const createListeners = multiaddrs.map((ma) => { - return (cb) => { - const done = once(cb) - const listener = transport.createListener(handler) - listener.once('error', done) + const createListeners = multiaddrs.map((ma) => { + return (cb) => { + const done = once(cb) + const listener = transport.createListener(handler) + listener.once('error', done) - listener.listen(ma, (err) => { + listener.listen(ma, (err) => { + if (err) { + return done(err) + } + listener.removeListener('error', done) + listener.getAddrs((err, addrs) => { if (err) { return done(err) } - listener.removeListener('error', done) - listener.getAddrs((err, addrs) => { - if (err) { - return done(err) - } - freshMultiaddrs = freshMultiaddrs.concat(addrs) - transport.listeners.push(listener) - done() - }) + freshMultiaddrs = freshMultiaddrs.concat(addrs) + transport.listeners.push(listener) + done() }) - } - }) + }) + } + }) - parallel(createListeners, (err) => { - if (err) { - return callback(err) - } + parallel(createListeners, (err) => { + if (err) { + return callback(err) + } - // cause we can listen on port 0 or 0.0.0.0 - swtch._peerInfo.multiaddrs.replace(multiaddrs, freshMultiaddrs) - callback() - }) - }, + // cause we can listen on port 0 or 0.0.0.0 + this.switch._peerInfo.multiaddrs.replace(multiaddrs, freshMultiaddrs) + callback() + }) + } - close (key, callback) { - const transport = swtch.transports[key] + /** + * Closes the transport with the given key, by closing all of its listeners + * + * @param {String} key + * @param {function(Error)} callback + * @returns {void} + */ + close (key, callback) { + const transport = this.switch.transports[key] + + if (!transport) { + return callback(new Error(`Trying to close non existing transport: ${key}`)) + } - if (!transport) { - return callback(new Error(`Trying to close non existing transport: ${key}`)) + parallel(transport.listeners.map((listener) => { + return (cb) => { + listener.close(cb) } + }), callback) + } - parallel(transport.listeners.map((listener) => { - return (cb) => { - listener.close(cb) - } - }), callback) - } + /** + * For a given transport, return its multiaddrs that match the given multiaddrs + * + * @param {Transport} transport + * @param {Array} multiaddrs + * @returns {Array} + */ + static dialables (transport, multiaddrs) { + return transport.filter(multiaddrs) } } -function dialables (tp, multiaddrs) { - return tp.filter(multiaddrs) -} +module.exports = TransportManager diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 9c87146..03ce441 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -63,8 +63,8 @@ describe(`circuit`, function () { }) it('listed on the transports map', () => { - expect(swarmA.transports['Circuit']).to.exist() - expect(swarmB.transports['Circuit']).to.exist() + expect(swarmA.transports.Circuit).to.exist() + expect(swarmB.transports.Circuit).to.exist() }) it('add /p2p-curcuit addrs on start', (done) => {