From 66990237bbb38a52e5b09f697ed24605c947b8c2 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 2 Nov 2016 15:23:28 +0000 Subject: [PATCH] feat: refactor, simplify, make handling of logs nicer --- src/agreement.js | 101 ------------------------- src/constants.js | 5 +- src/{dialer.js => dialer/index.js} | 39 +++++----- src/{listener.js => listener/index.js} | 40 +++++----- src/listener/selectHandler.js | 43 +++++++++++ src/select.js | 35 +++++++++ src/util.js | 62 +++++++++++++++ test/index.spec.js | 26 +++---- 8 files changed, 189 insertions(+), 162 deletions(-) delete mode 100644 src/agreement.js rename src/{dialer.js => dialer/index.js} (66%) rename src/{listener.js => listener/index.js} (67%) create mode 100644 src/listener/selectHandler.js create mode 100644 src/select.js create mode 100644 src/util.js diff --git a/src/agreement.js b/src/agreement.js deleted file mode 100644 index 897debc..0000000 --- a/src/agreement.js +++ /dev/null @@ -1,101 +0,0 @@ -'use strict' - -const handshake = require('pull-handshake') -const lp = require('pull-length-prefixed') -const pull = require('pull-stream') -const Connection = require('interface-connection').Connection -const debug = require('debug') -const log = debug('multistream:agreement') -log.error = debug('multistream:agreement:error') - -exports.select = (multicodec, callback, msThreadId) => { - const stream = handshake({ - timeout: 60 * 1000 - }, callback) - - const shake = stream.handshake - - log('(%s) writing multicodec %s', msThreadId, multicodec) - writeEncoded(shake, new Buffer(multicodec + '\n'), callback) - - lp.decodeFromReader(shake, (err, data) => { - if (err) { - return callback(err) - } - const protocol = data.toString().slice(0, -1) - - if (protocol !== multicodec) { - return callback(new Error(`"${multicodec}" not supported`), shake.rest()) - } - - log('(%s) received ack: %s', msThreadId, protocol) - callback(null, shake.rest()) - }) - - return stream -} - -exports.handlerSelector = (rawConn, handlersMap, msThreadId) => { - const cb = (err) => { - // incoming errors are irrelevant for the app - log.error(err) - } - - const stream = handshake({ - timeout: 60 * 1000 - }, cb) - - const shake = stream.handshake - - next() - - function next () { - lp.decodeFromReader(shake, (err, data) => { - if (err) { - return cb(err) - } - log('(%s) received: %s', msThreadId, data.toString()) - const protocol = data.toString().slice(0, -1) - const result = Object.keys(handlersMap).filter((id) => id === protocol) - const key = result && result[0] - - if (key) { - log('(%s) send ack back of: %s', msThreadId, protocol) - writeEncoded(shake, data, cb) - handlersMap[key](new Connection(shake.rest(), rawConn)) - } else { - log('(%s) not supported protocol: %s', - msThreadId, protocol) - writeEncoded(shake, new Buffer('na\n')) - next() - } - }) - } - - return stream -} - -// prefixes a message with a varint -function encode (msg, cb) { - const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] - - pull( - pull.values(values), - lp.encode(), - pull.collect((err, encoded) => { - if (err) { - return cb(err) - } - cb(null, encoded[0]) - }) - ) -} - -function writeEncoded (writer, msg, cb) { - encode(msg, (err, msg) => { - if (err) { - return cb(err) - } - writer.write(msg) - }) -} diff --git a/src/constants.js b/src/constants.js index fc0d0a2..e0013db 100644 --- a/src/constants.js +++ b/src/constants.js @@ -1,5 +1,4 @@ 'use strict' -module.exports = { - PROTOCOL_ID: '/multistream/1.0.0' -} +exports = module.exports +exports.PROTOCOL_ID = '/multistream/1.0.0' diff --git a/src/dialer.js b/src/dialer/index.js similarity index 66% rename from src/dialer.js rename to src/dialer/index.js index b62754c..bf08f36 100644 --- a/src/dialer.js +++ b/src/dialer/index.js @@ -1,76 +1,71 @@ 'use strict' -const lp = require('pull-length-prefixed') const varint = require('varint') const pull = require('pull-stream') +const pullLP = require('pull-length-prefixed') const Connection = require('interface-connection').Connection -const debug = require('debug') -const log = debug('multistream:dialer') +const util = require('../util') +const select = require('../select') -const PROTOCOL_ID = require('./constants').PROTOCOL_ID -const agrmt = require('./agreement') - -function getRandomId () { - return ((~~(Math.random() * 1e9)).toString(36)) -} +const PROTOCOL_ID = require('./../constants').PROTOCOL_ID module.exports = class Dialer { constructor () { this.conn = null - this.msThreadId = getRandomId() + this.log = util.log.dialer() } // perform the multistream handshake handle (rawConn, callback) { - log('(%s) dialer handle conn', this.msThreadId) - const ms = agrmt.select(PROTOCOL_ID, (err, conn) => { + this.log('dialer handle conn') + const s = select(PROTOCOL_ID, (err, conn) => { if (err) { return callback(err) } - log('(%s) handshake success', this.msThreadId) + this.log('handshake success') this.conn = new Connection(conn, rawConn) callback() - }, this.msThreadId) + }, this.log) pull( rawConn, - ms, + s, rawConn ) } select (protocol, callback) { - log('(%s) dialer select %s', this.msThreadId, protocol) + this.log('dialer select ' + protocol) if (!this.conn) { return callback(new Error('multistream handshake has not finalized yet')) } - const selectStream = agrmt.select(protocol, (err, conn) => { + const s = select(protocol, (err, conn) => { if (err) { this.conn = new Connection(conn, this.conn) return callback(err) } callback(null, new Connection(conn, this.conn)) - }, this.msThreadId) + }, this.log) pull( this.conn, - selectStream, + s, this.conn ) } ls (callback) { - const lsStream = agrmt.select('ls', (err, conn) => { + const lsStream = select('ls', (err, conn) => { if (err) { return callback(err) } pull( conn, - lp.decode(), + pullLP.decode(), collectLs(conn), pull.map(stringify), pull.collect((err, list) => { @@ -80,7 +75,7 @@ module.exports = class Dialer { callback(null, list.slice(1)) }) ) - }) + }, this.log) pull( this.conn, diff --git a/src/listener.js b/src/listener/index.js similarity index 67% rename from src/listener.js rename to src/listener/index.js index 05875ef..a023376 100644 --- a/src/listener.js +++ b/src/listener/index.js @@ -1,47 +1,46 @@ 'use strict' -const lp = require('pull-length-prefixed') const pull = require('pull-stream') +const pullLP = require('pull-length-prefixed') const varint = require('varint') const isFunction = require('lodash.isfunction') const assert = require('assert') -const debug = require('debug') -const log = debug('multistream:listener') +const select = require('../select') +const selectHandler = require('./selectHandler') +const util = require('./../util') const Connection = require('interface-connection').Connection -const PROTOCOL_ID = require('./constants').PROTOCOL_ID -const agrmt = require('./agreement') +const PROTOCOL_ID = require('./../constants').PROTOCOL_ID module.exports = class Listener { constructor () { this.handlers = { ls: (conn) => this._ls(conn) } + this.log = util.log.listener() } // perform the multistream handshake handle (rawConn, callback) { - const msThreadId = getRandomId() - log('(%s) listener handle conn', msThreadId) + this.log('listener handle conn') - const selectStream = agrmt.select(PROTOCOL_ID, (err, conn) => { + const selectStream = select(PROTOCOL_ID, (err, conn) => { if (err) { return callback(err) } - const hsConn = new Connection(conn, rawConn) + const shConn = new Connection(conn, rawConn) - const handlerSelector = - agrmt.handlerSelector(hsConn, this.handlers, msThreadId) + const sh = selectHandler(shConn, this.handlers, this.log) pull( - hsConn, - handlerSelector, - hsConn + shConn, + sh, + shConn ) callback() - }, msThreadId) + }, this.log) pull( rawConn, @@ -52,12 +51,11 @@ module.exports = class Listener { // be ready for a given `protocol` addHandler (protocol, handler) { - log('adding handler: %s', protocol) - + this.log('adding handler: ' + protocol) assert(isFunction(handler), 'handler must be a function') if (this.handlers[protocol]) { - log('overwriting handler for %s', protocol) + this.log('overwriting handler for ' + protocol) } this.handlers[protocol] = handler @@ -88,12 +86,8 @@ module.exports = class Listener { pull( pull.values(values), - lp.encode(), + pullLP.encode(), conn ) } } - -function getRandomId () { - return ((~~(Math.random() * 1e9)).toString(36)) -} diff --git a/src/listener/selectHandler.js b/src/listener/selectHandler.js new file mode 100644 index 0000000..b9645b3 --- /dev/null +++ b/src/listener/selectHandler.js @@ -0,0 +1,43 @@ +'use strict' + +const handshake = require('pull-handshake') +const lp = require('pull-length-prefixed') +const Connection = require('interface-connection').Connection +const writeEncoded = require('../util.js').writeEncoded + +function selectHandler (rawConn, handlersMap, log) { + const cb = (err) => { + // incoming errors are irrelevant for the app + log.error(err) + } + + const stream = handshake({ timeout: 60 * 1000 }, cb) + const shake = stream.handshake + + next() + return stream + + function next () { + lp.decodeFromReader(shake, (err, data) => { + if (err) { + return cb(err) + } + log('received:', data.toString()) + const protocol = data.toString().slice(0, -1) + const result = Object.keys(handlersMap).filter((id) => id === protocol) + const key = result && result[0] + + if (key) { + log('send ack back of: ' + protocol) + writeEncoded(shake, data, cb) + handlersMap[key](new Connection(shake.rest(), rawConn)) + } else { + log('not supported protocol: ' + protocol) + writeEncoded(shake, new Buffer('na\n')) + next() + } + }) + } +} + +module.exports = selectHandler diff --git a/src/select.js b/src/select.js new file mode 100644 index 0000000..c57ec55 --- /dev/null +++ b/src/select.js @@ -0,0 +1,35 @@ +'use strict' + +const handshake = require('pull-handshake') +const pullLP = require('pull-length-prefixed') +const util = require('./util') +const writeEncoded = util.writeEncoded + +function select (multicodec, callback, log) { + const stream = handshake({ + timeout: 60 * 1000 + }, callback) + + const shake = stream.handshake + + log('writing multicodec: ' + multicodec) + writeEncoded(shake, new Buffer(multicodec + '\n'), callback) + + pullLP.decodeFromReader(shake, (err, data) => { + if (err) { + return callback(err) + } + const protocol = data.toString().slice(0, -1) + + if (protocol !== multicodec) { + return callback(new Error(`"${multicodec}" not supported`), shake.rest()) + } + + log('received ack: ' + protocol) + callback(null, shake.rest()) + }) + + return stream +} + +module.exports = select diff --git a/src/util.js b/src/util.js new file mode 100644 index 0000000..c814fcb --- /dev/null +++ b/src/util.js @@ -0,0 +1,62 @@ +const pull = require('pull-stream') +const pullLP = require('pull-length-prefixed') +const debug = require('debug') + +exports = module.exports + +function randomId () { + return ((~~(Math.random() * 1e9)).toString(36)) +} + +// prefixes a message with a varint +// TODO this is a pull-stream 'creep' (pull stream to add a byte?') +function encode (msg, callback) { + const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] + + pull( + pull.values(values), + pullLP.encode(), + pull.collect((err, encoded) => { + if (err) { + return callback(err) + } + callback(null, encoded[0]) + }) + ) +} + +exports.writeEncoded = (writer, msg, callback) => { + encode(msg, (err, msg) => { + if (err) { + return callback(err) + } + writer.write(msg) + }) +} + +function createLogger (type) { + const rId = randomId() + + function printer (logger) { + return (msg) => { + if (Array.isArray(msg)) { + msg = msg.join(' ') + } + logger('(%s) %s', rId, msg) + } + } + + const log = printer(debug('mss:' + type)) + log.error = printer(debug('mss:' + type + ':error')) + + return log +} + +exports.log = {} + +exports.log.dialer = () => { + return createLogger('dialer\t') +} +exports.log.listener = () => { + return createLogger('listener\t') +} diff --git a/test/index.spec.js b/test/index.spec.js index 7fd456a..7ddc09f 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -4,21 +4,21 @@ const expect = require('chai').expect const pull = require('pull-stream') -const lp = require('pull-length-prefixed') -const pair = require('pull-pair/duplex') +const pullLP = require('pull-length-prefixed') +const pullPair = require('pull-pair/duplex') const multistream = require('../src') const parallel = require('run-parallel') const series = require('run-series') describe('multistream dialer', () => { it('sends the multistream multicodec', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] pull( listenerConn, - lp.decode(), + pullLP.decode(), pull.drain((data) => { expect(data.toString()).to.equal('/multistream/1.0.0\n') done() @@ -32,13 +32,13 @@ describe('multistream dialer', () => { }) describe('multistream listener', () => { it('sends the multistream multicodec', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] pull( dialerConn, - lp.decode(), + pullLP.decode(), pull.drain((data) => { expect(data.toString()).to.equal('/multistream/1.0.0\n') done() @@ -53,7 +53,7 @@ describe('multistream listener', () => { describe('multistream handshake', () => { it('performs the handshake handshake', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -72,7 +72,7 @@ describe('multistream handshake', () => { }) it('handle and select a protocol', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -122,7 +122,7 @@ describe('multistream handshake', () => { }) it('select non existing proto', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -153,7 +153,7 @@ describe('multistream handshake', () => { }) it('select a non existing proto and then select an existing proto', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -208,7 +208,7 @@ describe('multistream handshake', () => { }) it('ls', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -262,7 +262,7 @@ describe('multistream handshake', () => { }) it('handler must be a function', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1] @@ -295,7 +295,7 @@ describe('multistream handshake', () => { }) it('racing condition resistent', (done) => { - const p = pair() + const p = pullPair() const dialerConn = p[0] const listenerConn = p[1]