From fed8198175166851eb6e24e8f3190acf633fe015 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 9 Aug 2016 13:01:55 +0200 Subject: [PATCH 1/5] feat(spdy): migration to pull-streams --- examples/dialer.js | 24 +++--- examples/listener.js | 28 ++++--- gulpfile.js | 8 +- package.json | 23 +++--- src/index.js | 77 +++++-------------- src/muxer.js | 61 +++++++++++++++ src/spdy-codec.js | 3 + test/browser.js | 32 ++++---- test/compliance.js | 12 --- test/compliance.spec.js | 16 ++++ test/conn-properties.node.js | 77 +++++-------------- test/spdy-over-tcp.node.js | 116 ++++++++++++++++++++++++++++ test/spdy-over-ws.node.js | 144 ++++++++++++++++------------------- test/spdy.spec.js | 63 +++++++-------- 14 files changed, 397 insertions(+), 287 deletions(-) create mode 100644 src/muxer.js create mode 100644 src/spdy-codec.js delete mode 100644 test/compliance.js create mode 100644 test/compliance.spec.js create mode 100644 test/spdy-over-tcp.node.js diff --git a/examples/dialer.js b/examples/dialer.js index ec60801..e8cbb9d 100644 --- a/examples/dialer.js +++ b/examples/dialer.js @@ -1,26 +1,24 @@ 'use strict' const tcp = require('net') +const pull = require('pull-stream') +const toPull = require('stream-to-pull-stream') const libp2pSPDY = require('../src') const socket = tcp.connect(9999) -const muxer = libp2pSPDY(socket, false) +const muxer = libp2pSPDY.dial(toPull(socket)) muxer.on('stream', (stream) => { console.log('-> got new muxed stream') - stream.on('data', (data) => { - console.log('do I ever get data?', data) - }) - stream.pipe(stream) + pull(stream, pull.log, stream) }) console.log('-> opening a stream from my side') -muxer.newStream((err, stream) => { - if (err) { - throw err - } - - console.log('-> opened the stream') - stream.write('hey, how is it going. I am dialer') - stream.end() +const stream = muxer.newStream((err) => { + if (err) throw err }) + +pull( + pull.values(['hey, how is it going. I am dialer']), + stream +) diff --git a/examples/listener.js b/examples/listener.js index 58868df..27331d8 100644 --- a/examples/listener.js +++ b/examples/listener.js @@ -1,30 +1,36 @@ 'use strict' const tcp = require('net') +const pull = require('pull-stream') +const toPull = require('stream-to-pull-stream') const libp2pSPDY = require('../src') const listener = tcp.createServer((socket) => { console.log('-> got connection') - const muxer = libp2pSPDY(socket, true) + const muxer = libp2pSPDY.listen(toPull(socket)) muxer.on('stream', (stream) => { console.log('-> got new muxed stream') - stream.on('data', (data) => { - console.log('DO I GET DATA?', data) - }) - stream.pipe(stream) + pull( + stream, + pull.through((data) => { + console.log('DO I GET DATA?', data) + }), + stream + ) }) console.log('-> opening a stream from my side') - muxer.newStream((err, stream) => { - if (err) { - throw err - } + const stream = muxer.newStream((err) => { + if (err) throw err console.log('-> opened the stream') - stream.write('hey, how is it going') - stream.end() }) + + pull( + pull.values(['hey, how is it going']), + stream + ) }) listener.listen(9999, () => { diff --git a/gulpfile.js b/gulpfile.js index 88fe888..61cbfb8 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -3,6 +3,7 @@ const gulp = require('gulp') const WSlibp2p = require('libp2p-websockets') const multiaddr = require('multiaddr') +const pull = require('pull-stream') const spdy = require('./src') @@ -12,14 +13,13 @@ gulp.task('test:browser:before', (done) => { const ws = new WSlibp2p() const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener((transportSocket) => { - const muxedConn = spdy(transportSocket, true) - + const muxedConn = spdy.listen(transportSocket) muxedConn.on('stream', (connRx) => { const connTx = muxedConn.newStream() - connRx.pipe(connTx) - connTx.pipe(connRx) + pull(connRx, connTx, connRx) }) }) + listener.listen(mh, done) }) diff --git a/package.json b/package.json index c204da1..1edac20 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,6 @@ "main": "lib/index.js", "jsnext:main": "src/index.js", "scripts": { - "compliance": "node test/compliance.js | tap-spec", "lint": "gulp lint", "build": "gulp build", "test": "gulp test", @@ -35,22 +34,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-spdy", "devDependencies": { - "aegir": "^6.0.0", - "bl": "^1.1.2", + "aegir": "^6.0.1", "chai": "^3.5.0", "interface-stream-muxer": "^0.3.1", - "libp2p-tcp": "^0.7.4", - "libp2p-websockets": "^0.7.1", + "libp2p-tcp": "^0.8.0", + "libp2p-websockets": "^0.8.0", "multiaddr": "^2.0.0", "pre-commit": "^1.1.2", + "pull-file": "^0.5.0", + "pull-pair": "^1.1.0", + "pull-stream": "^3.4.3", "run-parallel": "^1.1.6", - "stream-pair": "^1.0.3", + "stream-to-pull-stream": "^1.7.0", + "tap-spec": "^4.1.1", "tape": "^4.2.0" }, "dependencies": { "browserify-zlib": "github:ipfs/browserify-zlib", - "interface-connection": "^0.1.8", - "spdy-transport": "^2.0.11" + "interface-connection": "^0.2.1", + "lodash.noop": "^3.0.1", + "pull-stream-to-stream": "^1.3.1", + "spdy-transport": "^2.0.14", + "stream-to-pull-stream": "^1.7.0" }, "contributors": [ "David Dias ", @@ -59,4 +64,4 @@ "dignifiedquire ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/index.js b/src/index.js index cdb45fb..3495b0e 100644 --- a/src/index.js +++ b/src/index.js @@ -1,70 +1,31 @@ 'use strict' const spdy = require('spdy-transport') -const Connection = require('interface-connection').Connection -const EE = require('events').EventEmitter +const toStream = require('pull-stream-to-stream') -exports = module.exports = function (conn, isListener) { - const muxer = spdy.connection.create(conn, { - protocol: 'spdy', - isServer: isListener - }) - - const proxyMuxer = new EE() - - muxer.start(3.1) - - // method added to enable pure stream muxer feeling - proxyMuxer.newStream = (callback) => { - if (!callback) { - callback = noop - } - - const muxedConn = new Connection(muxer.request({ - method: 'POST', - path: '/', - headers: {} - }, callback)) +const Muxer = require('./muxer') +const SPDY_CODEC = require('./spdy-codec') - if (conn.getObservedAddrs) { - muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn) - muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn) - muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn) - } +function create (rawConn, isListener) { + const conn = toStream(rawConn) + // Let it flow, let it flooow + conn.resume() - return muxedConn - } - - // The rest of the API comes by default with SPDY - muxer.on('close', () => { - proxyMuxer.emit('close') - }) - - muxer.on('error', (err) => { - proxyMuxer.emit('error', err) + conn.on('end', () => { + // Cleanup and destroy the connection when it ends + // as the converted stream doesn't emit 'close' + // but .destroy will trigger a 'close' event. + conn.destroy() }) - proxyMuxer.end = (cb) => { - muxer.end(cb) - } - - // needed by other spdy impl that need the response headers - // in order to confirm the stream can be open - muxer.on('stream', (stream) => { - stream.respond(200, {}) - const muxedConn = new Connection(stream) - if (conn.getObservedAddrs) { - muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn) - muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn) - muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn) - } - proxyMuxer.emit('stream', muxedConn) + const spdyMuxer = spdy.connection.create(conn, { + protocol: 'spdy', + isServer: isListener }) - proxyMuxer.multicodec = exports.multicodec - return proxyMuxer + return new Muxer(rawConn, spdyMuxer) } -exports.multicodec = '/spdy/3.1.0' - -function noop () {} +exports.multicodec = SPDY_CODEC +exports.dial = (conn) => create(conn, false) +exports.listen = (conn) => create(conn, true) diff --git a/src/muxer.js b/src/muxer.js new file mode 100644 index 0000000..e7ee763 --- /dev/null +++ b/src/muxer.js @@ -0,0 +1,61 @@ +'use strict' + +const EventEmitter = require('events').EventEmitter +const noop = require('lodash.noop') +const Connection = require('interface-connection').Connection +const toPull = require('stream-to-pull-stream') + +const SPDY_CODEC = require('./spdy-codec') + +module.exports = class Muxer extends EventEmitter { + constructor (conn, spdy) { + super() + + this.spdy = spdy + this.conn = conn + this.multicodec = SPDY_CODEC + + spdy.start(3.1) + + // The rest of the API comes by default with SPDY + spdy.on('close', () => { + this.emit('close') + }) + + spdy.on('error', (err) => { + this.emit('error', err) + }) + + // needed by other spdy impl that need the response headers + // in order to confirm the stream can be open + spdy.on('stream', (stream) => { + stream.respond(200, {}) + const muxedConn = new Connection(toPull.duplex(stream), this.conn) + this.emit('stream', muxedConn) + }) + } + + // method added to enable pure stream muxer feeling + newStream (callback) { + if (!callback) { + callback = noop + } + const conn = new Connection(null, this.conn) + + this.spdy.request({ + method: 'POST', + path: '/', + headers: {} + }, (err, stream) => { + conn.setInnerConn(toPull.duplex(stream), this.conn) + + callback(err, conn) + }) + + return conn + } + + end (cb) { + this.spdy.end(cb) + } +} diff --git a/src/spdy-codec.js b/src/spdy-codec.js new file mode 100644 index 0000000..e7c4b49 --- /dev/null +++ b/src/spdy-codec.js @@ -0,0 +1,3 @@ +'use strict' + +module.exports = '/spdy/3.1.0' diff --git a/test/browser.js b/test/browser.js index c2d717f..0c371bf 100644 --- a/test/browser.js +++ b/test/browser.js @@ -3,8 +3,10 @@ const expect = require('chai').expect const WSlibp2p = require('libp2p-websockets') -const spdy = require('../src') const multiaddr = require('multiaddr') +const pull = require('pull-stream') + +const spdy = require('../src') describe('browser-server', () => { let ws @@ -16,22 +18,24 @@ describe('browser-server', () => { it('ricochet test', (done) => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') const transportSocket = ws.dial(mh) - const muxedConn = spdy(transportSocket, false) + const muxedConn = spdy.dial(transportSocket) muxedConn.on('stream', (conn) => { - conn.on('data', (data) => { - expect(data.toString()).to.equal('hey') - }) - - conn.on('end', () => { - conn.end() - }) + pull( + conn, + pull.collect((err, chunks) => { + console.log('collect', err, chunks) + expect(err).to.not.exist + expect(chunks).to.be.eql([Buffer('hey')]) + pull(pull.empty(), conn) + }) + ) }) - const conn = muxedConn.newStream() - conn.write('hey') - conn.end() - conn.on('data', () => {}) // let it floooow - conn.on('end', done) + pull( + pull.values([Buffer('hey')]), + muxedConn.newStream(), + pull.onEnd(done) + ) }) }) diff --git a/test/compliance.js b/test/compliance.js deleted file mode 100644 index 4721da0..0000000 --- a/test/compliance.js +++ /dev/null @@ -1,12 +0,0 @@ -'use strict' - -const tape = require('tape') -const tests = require('interface-stream-muxer') -const spdy = require('./../src') - -const common = { - setup: (t, cb) => cb(null, spdy), - teardown: (t, cb) => cb() -} - -tests(tape, common) diff --git a/test/compliance.spec.js b/test/compliance.spec.js new file mode 100644 index 0000000..a7683cb --- /dev/null +++ b/test/compliance.spec.js @@ -0,0 +1,16 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-stream-muxer') +const spdy = require('../src') + +describe('compliance', () => { + tests({ + setup (cb) { + cb(null, spdy) + }, + teardown (cb) { + cb() + } + }) +}) diff --git a/test/conn-properties.node.js b/test/conn-properties.node.js index 04640e3..c9c63f0 100644 --- a/test/conn-properties.node.js +++ b/test/conn-properties.node.js @@ -4,55 +4,38 @@ const expect = require('chai').expect const TCP = require('libp2p-tcp') const Connection = require('interface-connection').Connection -const spdy = require('../src') const multiaddr = require('multiaddr') +const pull = require('pull-stream') const parallel = require('run-parallel') +const spdy = require('../src') + describe('conn properties are propagated to each stream', () => { let lMuxer let dMuxer let dConn let listener - before((done) => { + before(() => { const dtcp = new TCP() const ltcp = new TCP() const ma = multiaddr('/ip4/127.0.0.1/tcp/9876') listener = ltcp.createListener((conn) => { - conn.on('error', () => {}) - lMuxer = spdy(conn, true) - - lMuxer.on('error', () => {}) - + lMuxer = spdy.listen(conn) lMuxer.on('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) }) - listener.on('error', () => {}) - - listener.listen(ma, dial) - - function dial () { - dConn = dtcp.dial(ma) - dConn.on('error', () => {}) - - dConn.on('connect', () => { - dMuxer = spdy(dConn, false) - dMuxer.on('error', () => {}) - done() - }) - } + listener.listen(ma) + dConn = dtcp.dial(ma) + dMuxer = spdy.dial(dConn) }) after((done) => { - parallel([ - (cb) => { - dConn.destroy() - dConn.on('close', cb) - }, - listener.close - ], done) + // TODO: fix listener close hanging + // listener.close(done) + done() }) it('getObservedAddrs', (done) => { @@ -65,9 +48,7 @@ describe('conn properties are propagated to each stream', () => { conn.getObservedAddrs((err, addrs) => { expect(err).to.not.exist oa1 = addrs - conn.resume() - conn.on('end', cb) - conn.end() + pull(pull.empty(), conn, pull.onEnd(cb)) }) }, (cb) => { @@ -87,9 +68,7 @@ describe('conn properties are propagated to each stream', () => { const conn = dMuxer.newStream() conn.getPeerInfo((err, pInfo) => { expect(err).to.exist - conn.resume() - conn.on('end', done) - conn.end() + pull(pull.empty(), conn, pull.onEnd(done)) }) }) @@ -101,9 +80,7 @@ describe('conn properties are propagated to each stream', () => { conn.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn.resume() - conn.on('end', cb) - conn.end() + pull(pull.empty(), conn, pull.onEnd(cb)) }) }, (cb) => { @@ -122,9 +99,7 @@ describe('conn properties are propagated to each stream', () => { proxyConn.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn.resume() - conn.on('end', done) - conn.end() + pull(pull.empty(), conn, pull.onEnd(done)) }) }) @@ -139,36 +114,28 @@ describe('conn properties are propagated to each stream', () => { conn1.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn1.resume() - conn1.on('end', cb) - conn1.end() + pull(pull.empty(), conn1, pull.onEnd(cb)) }) }, (cb) => { conn2.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn2.resume() - conn2.on('end', cb) - conn2.end() + pull(pull.empty(), conn2, pull.onEnd(cb)) }) }, (cb) => { conn3.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn3.resume() - conn3.on('end', cb) - conn3.end() + pull(pull.empty(), conn3, pull.onEnd(cb)) }) }, (cb) => { conn4.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('banana') - conn4.resume() - conn4.on('end', cb) - conn4.end() + pull(pull.empty(), conn4, pull.onEnd(cb)) }) } ], done) @@ -182,9 +149,7 @@ describe('conn properties are propagated to each stream', () => { conn.getPeerInfo((err, pInfo) => { expect(err).to.not.exist expect(pInfo).to.equal('pineapple') - conn.resume() - conn.on('end', cb) - conn.end() + pull(pull.empty(), conn, pull.onEnd(cb)) }) }, (cb) => { diff --git a/test/spdy-over-tcp.node.js b/test/spdy-over-tcp.node.js new file mode 100644 index 0000000..ddc6d94 --- /dev/null +++ b/test/spdy-over-tcp.node.js @@ -0,0 +1,116 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const Tcp = require('libp2p-tcp') +const multiaddr = require('multiaddr') +const path = require('path') +const fs = require('fs') +const pull = require('pull-stream') +const file = require('pull-file') + +const spdy = require('../src') + +describe('spdy-over-tcp', () => { + let listener + let dialer + + let tcp + let mh = multiaddr('/ip4/127.0.0.1/tcp/9090') + + before(() => { + tcp = new Tcp() + }) + + it('attach to a tcp socket, as listener', (done) => { + const tcpListener = tcp.createListener((socket) => { + expect(socket).to.exist + listener = spdy.listen(socket) + expect(listener).to.exist + }) + + tcpListener.listen(mh, done) + }) + + it('attach to a tcp socket, as dialer', (done) => { + const socket = tcp.dial(mh) + expect(socket).to.exist + dialer = spdy.dial(socket) + expect(dialer).to.exist + done() + }) + + it('open a multiplex stream from dialer', (done) => { + listener.once('stream', (conn) => { + pull(conn, conn) + }) + + pull( + pull.empty(), + dialer.newStream(), + pull.onEnd(done) + ) + }) + + it('open a multiplex stream from listener', (done) => { + dialer.once('stream', (conn) => { + pull(conn, conn) + }) + + pull( + pull.empty(), + listener.newStream(), + pull.onEnd(done) + ) + }) + + it('open a spdy stream from dialer and write to it', (done) => { + listener.once('stream', (conn) => { + pull(conn, conn) + }) + + pull( + pull.values(['hello world']), + dialer.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + expect(data[0].toString()).to.equal('hello world') + done() + }) + ) + }) + + it('open a spdy stream from listener and write to it', (done) => { + dialer.once('stream', (conn) => { + pull(conn, conn) + }) + + pull( + pull.values(['hello world']), + listener.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + expect(data[0].toString()).to.equal('hello world') + done() + }) + ) + }) + + it('open a spdy stream from listener and write a lot', (done) => { + dialer.once('stream', (conn) => { + pull(conn, conn) + }) + + const filePath = path.join(process.cwd(), '/test/test-data/1.2MiB.txt') + pull( + file(filePath), + listener.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + const expected = fs.readFileSync(filePath) + expect(Buffer.concat(data)).to.deep.equal(expected) + done() + }) + ) + }) +}) diff --git a/test/spdy-over-ws.node.js b/test/spdy-over-ws.node.js index a401c09..b4fef3c 100644 --- a/test/spdy-over-ws.node.js +++ b/test/spdy-over-ws.node.js @@ -1,133 +1,119 @@ -'use strict' - /* eslint-env mocha */ +'use strict' const expect = require('chai').expect const WSlibp2p = require('libp2p-websockets') const multiaddr = require('multiaddr') -const bl = require('bl') -const spdy = require('../src') const path = require('path') const fs = require('fs') +const pull = require('pull-stream') +const file = require('pull-file') + +const spdy = require('../src') describe('spdy-over-ws', () => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') + let listener let dialer - let ws - let mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - before(() => { + before((done) => { ws = new WSlibp2p() - }) - it('attach to a websocket, as listener', (done) => { + let i = 0 + const finish = () => { + i++ + return i === 2 ? done() : null + } + const wsListener = ws.createListener((socket) => { expect(socket).to.exist - listener = spdy(socket, true) + listener = spdy.listen(socket) expect(listener).to.exist + finish() }) - wsListener.listen(mh, done) - }) - - it('attach to a websocket, as dialer', (done) => { const socket = ws.dial(mh) - expect(socket).to.exist - dialer = spdy(socket, false) - expect(dialer).to.exist - done() + + wsListener.listen(mh, () => { + dialer = spdy.dial(socket) + expect(dialer).to.exist + finish() + }) }) it('open a multiplex stream from dialer', (done) => { listener.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) - const conn = dialer.newStream() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - - conn.on('data', () => {}) // otherwise data doesn't flow - conn.on('end', done) - conn.end() + pull( + pull.empty(), + dialer.newStream(), + pull.onEnd(done) + ) }) it('open a multiplex stream from listener', (done) => { dialer.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) - const conn = listener.newStream() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - - conn.on('data', () => {}) // otherwise data doesn't flow - conn.on('end', done) - conn.end() + pull( + pull.empty(), + listener.newStream(), + pull.onEnd(done) + ) }) it('open a spdy stream from dialer and write to it', (done) => { listener.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) - const conn = dialer.newStream() - conn.write('hello world') - conn.end() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('hello world') - done() - })) + pull( + pull.values(['hello world']), + dialer.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + expect(data[0].toString()).to.equal('hello world') + done() + }) + ) }) it('open a spdy stream from listener and write to it', (done) => { dialer.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) - const conn = listener.newStream() - conn.write('hello world') - conn.end() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('hello world') - done() - })) + pull( + pull.values(['hello world']), + listener.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + expect(data[0].toString()).to.equal('hello world') + done() + }) + ) }) it('open a spdy stream from listener and write a lot', (done) => { dialer.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) const filePath = path.join(process.cwd(), '/test/test-data/1.2MiB.txt') - - const conn = listener.newStream() - fs.createReadStream(filePath).pipe(conn) - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - const expected = fs.readFileSync(filePath) - expect(data).to.deep.equal(expected) - done() - })) + pull( + file(filePath), + listener.newStream(), + pull.collect((err, data) => { + expect(err).to.not.exist + const expected = fs.readFileSync(filePath) + expect(Buffer.concat(data)).to.deep.equal(expected) + done() + }) + ) }) }) diff --git a/test/spdy.spec.js b/test/spdy.spec.js index 65a4ed6..be6a9c0 100644 --- a/test/spdy.spec.js +++ b/test/spdy.spec.js @@ -1,8 +1,10 @@ -'use strict' /* eslint-env mocha */ +'use strict' const expect = require('chai').expect -const streamPair = require('stream-pair') +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') + const spdy = require('../src') describe('spdy-generic', () => { @@ -12,54 +14,53 @@ describe('spdy-generic', () => { let listener let dialer - before((done) => { - const pair = streamPair.create() - dialerSocket = pair - listenerSocket = pair.other - done() + before(() => { + const p = pair() + dialerSocket = p[0] + listenerSocket = p[1] }) - it('attach to a duplex stream, as listener', (done) => { - listener = spdy(listenerSocket, true) + it('attach to a duplex stream, as listener', () => { + listener = spdy.listen(listenerSocket) expect(listener).to.exist - done() }) - it('attach to a duplex stream, as dialer', (done) => { - dialer = spdy(dialerSocket, false) + it('attach to a duplex stream, as dialer', () => { + dialer = spdy.dial(dialerSocket) expect(dialer).to.exist - done() }) it('open a multiplex stream from client', (done) => { listener.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) const conn = dialer.newStream() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - - conn.on('data', () => {}) // otherwise data doesn't flow - conn.on('end', done) - conn.end() + pull( + pull.values(['hello']), + conn, + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.eql([Buffer('hello')]) + done() + }) + ) }) it('open a multiplex stream from listener', (done) => { dialer.once('stream', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) const conn = listener.newStream() - - conn.on('error', (err) => { - expect(err).to.not.exist - }) - - conn.on('data', () => {}) // otherwise data doesn't flow - conn.on('end', done) - conn.end() + pull( + pull.values(['hello']), + conn, + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.eql([Buffer('hello')]) + done() + }) + ) }) }) From bfcc8f0dc4e32c3c03d17098620c55997888bc9b Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 09:33:38 -0400 Subject: [PATCH 2/5] feat(api): keep the isListener api --- src/index.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index 3495b0e..3df6162 100644 --- a/src/index.js +++ b/src/index.js @@ -26,6 +26,7 @@ function create (rawConn, isListener) { return new Muxer(rawConn, spdyMuxer) } +exports = module.exports = create exports.multicodec = SPDY_CODEC -exports.dial = (conn) => create(conn, false) -exports.listen = (conn) => create(conn, true) +exports.dialer = (conn) => create(conn, false) +exports.listener = (conn) => create(conn, true) From 5aafbe5041db1b8464447dc201150af29f32014c Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 09:41:33 -0400 Subject: [PATCH 3/5] feat(readme): pull-streams note --- README.md | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 833511c..ebe1228 100644 --- a/README.md +++ b/README.md @@ -54,13 +54,13 @@ Loading this module through a script tag will make the `Lip2pSpdy` obj available **As a listener** ```JavaScript -const listener = spdy(socket, true) +const listener = spdy(conn, true) ``` **As a dialer** ```JavaScript -const dialer = spdy(socket, false) +const dialer = spdy(conn, false) ``` #### Opening a multiplex duplex stream @@ -97,3 +97,29 @@ dialer.on('error', () => {}) ``` note: Works the same on the listener side + +### This module uses `pull-streams` + +We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). + +You can learn more about pull-streams at: + +- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) +- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) +- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) +- [pull-streams documentation](https://pull-stream.github.io/) + +#### Converting `pull-streams` to Node.js Streams + +If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: + +```js +const pullToStream = require('pull-stream-to-stream') + +const nodeStreamInstance = pullToStream(pullStreamInstance) +// nodeStreamInstance is an instance of a Node.js Stream +``` + +To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. + + From 30461eda3d999f5b031c74a1fee7f4b3687ec227 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 09:48:41 -0400 Subject: [PATCH 4/5] feat(api): dial to dialer, listen to listener --- test/conn-properties.node.js | 4 ++-- test/spdy-over-tcp.node.js | 4 ++-- test/spdy-over-ws.node.js | 4 ++-- test/spdy.spec.js | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/test/conn-properties.node.js b/test/conn-properties.node.js index c9c63f0..52eab83 100644 --- a/test/conn-properties.node.js +++ b/test/conn-properties.node.js @@ -21,7 +21,7 @@ describe('conn properties are propagated to each stream', () => { const ltcp = new TCP() const ma = multiaddr('/ip4/127.0.0.1/tcp/9876') listener = ltcp.createListener((conn) => { - lMuxer = spdy.listen(conn) + lMuxer = spdy.listener(conn) lMuxer.on('stream', (conn) => { pull(conn, conn) }) @@ -29,7 +29,7 @@ describe('conn properties are propagated to each stream', () => { listener.listen(ma) dConn = dtcp.dial(ma) - dMuxer = spdy.dial(dConn) + dMuxer = spdy.dialer(dConn) }) after((done) => { diff --git a/test/spdy-over-tcp.node.js b/test/spdy-over-tcp.node.js index ddc6d94..7340603 100644 --- a/test/spdy-over-tcp.node.js +++ b/test/spdy-over-tcp.node.js @@ -25,7 +25,7 @@ describe('spdy-over-tcp', () => { it('attach to a tcp socket, as listener', (done) => { const tcpListener = tcp.createListener((socket) => { expect(socket).to.exist - listener = spdy.listen(socket) + listener = spdy.listener(socket) expect(listener).to.exist }) @@ -35,7 +35,7 @@ describe('spdy-over-tcp', () => { it('attach to a tcp socket, as dialer', (done) => { const socket = tcp.dial(mh) expect(socket).to.exist - dialer = spdy.dial(socket) + dialer = spdy.dialer(socket) expect(dialer).to.exist done() }) diff --git a/test/spdy-over-ws.node.js b/test/spdy-over-ws.node.js index b4fef3c..cf13280 100644 --- a/test/spdy-over-ws.node.js +++ b/test/spdy-over-ws.node.js @@ -29,7 +29,7 @@ describe('spdy-over-ws', () => { const wsListener = ws.createListener((socket) => { expect(socket).to.exist - listener = spdy.listen(socket) + listener = spdy.listener(socket) expect(listener).to.exist finish() }) @@ -37,7 +37,7 @@ describe('spdy-over-ws', () => { const socket = ws.dial(mh) wsListener.listen(mh, () => { - dialer = spdy.dial(socket) + dialer = spdy.dialer(socket) expect(dialer).to.exist finish() }) diff --git a/test/spdy.spec.js b/test/spdy.spec.js index be6a9c0..9da0f6c 100644 --- a/test/spdy.spec.js +++ b/test/spdy.spec.js @@ -21,12 +21,12 @@ describe('spdy-generic', () => { }) it('attach to a duplex stream, as listener', () => { - listener = spdy.listen(listenerSocket) + listener = spdy.listener(listenerSocket) expect(listener).to.exist }) it('attach to a duplex stream, as dialer', () => { - dialer = spdy.dial(dialerSocket) + dialer = spdy.dialer(dialerSocket) expect(dialer).to.exist }) From 937cc17e84606e3fdaf7b40e8eff07c2a7ebead0 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Sep 2016 17:36:36 -0400 Subject: [PATCH 5/5] refactor(api): follow spec interface --- examples/dialer.js | 2 +- examples/listener.js | 2 +- gulpfile.js | 2 +- package.json | 2 +- test/browser.js | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/dialer.js b/examples/dialer.js index e8cbb9d..da215e1 100644 --- a/examples/dialer.js +++ b/examples/dialer.js @@ -6,7 +6,7 @@ const toPull = require('stream-to-pull-stream') const libp2pSPDY = require('../src') const socket = tcp.connect(9999) -const muxer = libp2pSPDY.dial(toPull(socket)) +const muxer = libp2pSPDY.dialer(toPull(socket)) muxer.on('stream', (stream) => { console.log('-> got new muxed stream') diff --git a/examples/listener.js b/examples/listener.js index 27331d8..360156f 100644 --- a/examples/listener.js +++ b/examples/listener.js @@ -8,7 +8,7 @@ const libp2pSPDY = require('../src') const listener = tcp.createServer((socket) => { console.log('-> got connection') - const muxer = libp2pSPDY.listen(toPull(socket)) + const muxer = libp2pSPDY.listener(toPull(socket)) muxer.on('stream', (stream) => { console.log('-> got new muxed stream') diff --git a/gulpfile.js b/gulpfile.js index 61cbfb8..8465719 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -13,7 +13,7 @@ gulp.task('test:browser:before', (done) => { const ws = new WSlibp2p() const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener((transportSocket) => { - const muxedConn = spdy.listen(transportSocket) + const muxedConn = spdy.listener(transportSocket) muxedConn.on('stream', (connRx) => { const connTx = muxedConn.newStream() pull(connRx, connTx, connRx) diff --git a/package.json b/package.json index 1edac20..49c0f16 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "devDependencies": { "aegir": "^6.0.1", "chai": "^3.5.0", - "interface-stream-muxer": "^0.3.1", + "interface-stream-muxer": "^0.4.0", "libp2p-tcp": "^0.8.0", "libp2p-websockets": "^0.8.0", "multiaddr": "^2.0.0", diff --git a/test/browser.js b/test/browser.js index 0c371bf..5b7db96 100644 --- a/test/browser.js +++ b/test/browser.js @@ -18,7 +18,7 @@ describe('browser-server', () => { it('ricochet test', (done) => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') const transportSocket = ws.dial(mh) - const muxedConn = spdy.dial(transportSocket) + const muxedConn = spdy.dialer(transportSocket) muxedConn.on('stream', (conn) => { pull(