From c06da3b925f6d4700b0b1045f4d0e4ea7d868fb9 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 6 Sep 2016 16:05:48 +0200 Subject: [PATCH] feat(tests): add closing tests, make sure errors are propagated --- package.json | 8 ++- src/close-test.js | 161 ++++++++++++++++++++++++++++++++++++++++++++++ src/index.js | 2 + 3 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 src/close-test.js diff --git a/package.json b/package.json index 0015f61..4f06026 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "interface-stream-muxer", "version": "0.3.1", "description": "A test suite and interface you can use to implement a stream muxer.", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "test": "exit(0)", @@ -34,9 +34,13 @@ "async": "^2.0.1", "chai": "^3.5.0", "chai-checkmark": "^1.0.1", + "libp2p-tcp": "^0.8.1", + "multiaddr": "^2.0.2", "pull-generate": "^2.2.0", "pull-pair": "^1.1.0", - "pull-stream": "^3.4.3" + "pull-stream": "^3.4.3", + "run-parallel": "^1.1.6", + "run-series": "^1.1.4" }, "devDependencies": { "aegir": "^6.0.1" diff --git a/src/close-test.js b/src/close-test.js new file mode 100644 index 0000000..f7bdc38 --- /dev/null +++ b/src/close-test.js @@ -0,0 +1,161 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('chai-checkmark')) +const expect = chai.expect +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') +const parallel = require('run-parallel') +const series = require('run-series') +const Tcp = require('libp2p-tcp') +const multiaddr = require('multiaddr') + +const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') + +function closeAndWait (stream) { + pull( + pull.empty(), + stream, + pull.onEnd((err) => { + expect(err).to.not.exist.mark() + }) + ) +} + +module.exports = (common) => { + describe.only('close', () => { + let muxer + + beforeEach((done) => { + common.setup((err, _muxer) => { + if (err) return done(err) + muxer = _muxer + done() + }) + }) + + it('closing underlying closes streams (tcp)', (done) => { + expect(2).checks(done) + + const tcp = new Tcp() + const tcpListener = tcp.createListener((socket) => { + const listener = muxer.listen(socket) + listener.on('stream', (stream) => { + pull(stream, stream) + }) + }) + + tcpListener.listen(mh, () => { + const dialer = muxer.dial(tcp.dial(mh, () => { + tcpListener.close() + })) + + const s1 = dialer.newStream(() => { + pull( + s1, + pull.onEnd((err) => { + expect(err).to.exist.mark() + }) + ) + + const s2 = dialer.newStream(() => { + pull( + s2, + pull.onEnd((err) => { + expect(err).to.exist.mark() + }) + ) + }) + }) + }) + }) + + it('closing one of the muxed streams doesn\'t close others', (done) => { + const p = pair() + const dialer = muxer.dial(p[0]) + const listener = muxer.listen(p[1]) + + expect(6).checks(done) + + const conns = [] + + listener.on('stream', (stream) => { + expect(stream).to.exist.mark() + pull(stream, stream) + }) + + for (let i = 0; i < 5; i++) { + conns.push(dialer.newStream()) + } + + conns.forEach((conn, i) => { + if (i === 2) { + closeAndWait(conn) + } else { + pull( + conn, + pull.onEnd(() => { + throw new Error('should not end') + }) + ) + } + }) + }) + + it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => { + const p = pair() + const dialer = muxer.dial(p[0]) + const listener = muxer.listen(p[1]) + + expect(15).checks(done) + + const conns = [] + const count = [] + for (let i = 0; i < 5; i++) { + count.push(i) + } + + series(count.map((i) => (cb) => { + parallel([ + (cb) => listener.once('stream', (stream) => { + console.log('pipe') + expect(stream).to.exist.mark() + pull(stream, stream) + cb() + }), + (cb) => conns.push(dialer.newStream(cb)) + ], cb) + }), (err) => { + if (err) return done(err) + + conns.forEach((conn, i) => { + pull( + pull.values([Buffer('hello')]), + pull.asyncMap((val, cb) => { + setTimeout(() => { + cb(null, val) + }, i * 10) + }), + pull.through((val) => console.log('send', val)), + conn, + pull.through((val) => console.log('recv', val)), + pull.collect((err, data) => { + console.log('end', i) + expect(err).to.not.exist.mark() + expect(data).to.be.eql([Buffer('hello')]).mark() + }) + ) + }) + + listener.on('close', () => { + console.log('closed listener') + }) + + dialer.end(() => { + console.log('CLOSED') + }) + }) + }) + }) +} diff --git a/src/index.js b/src/index.js index d7f82bf..b182e27 100644 --- a/src/index.js +++ b/src/index.js @@ -2,12 +2,14 @@ 'use strict' const baseTest = require('./base-test') +const closeTest = require('./close-test') const stressTest = require('./stress-test') const megaStressTest = require('./mega-stress-test') module.exports = (common) => { describe('interface-stream-muxer', () => { baseTest(common) + closeTest(common) stressTest(common) megaStressTest(common) })