Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
feat(tests): add closing tests, make sure errors are propagated
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 6, 2016
1 parent 5069679 commit c06da3b
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 2 deletions.
8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "interface-stream-muxer",
"version": "0.3.1",
"description": "A test suite and interface you can use to implement a stream muxer.",
"main": "lib/index.js",
"main": "src/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"test": "exit(0)",
Expand Down Expand Up @@ -34,9 +34,13 @@
"async": "^2.0.1",
"chai": "^3.5.0",
"chai-checkmark": "^1.0.1",
"libp2p-tcp": "^0.8.1",
"multiaddr": "^2.0.2",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.4.3"
"pull-stream": "^3.4.3",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4"
},
"devDependencies": {
"aegir": "^6.0.1"
Expand Down
161 changes: 161 additions & 0 deletions src/close-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const parallel = require('run-parallel')
const series = require('run-series')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')

const mh = multiaddr('/ip4/127.0.0.1/tcp/9090')

function closeAndWait (stream) {
pull(
pull.empty(),
stream,
pull.onEnd((err) => {
expect(err).to.not.exist.mark()
})
)
}

module.exports = (common) => {
describe.only('close', () => {
let muxer

beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})

it('closing underlying closes streams (tcp)', (done) => {
expect(2).checks(done)

const tcp = new Tcp()
const tcpListener = tcp.createListener((socket) => {
const listener = muxer.listen(socket)
listener.on('stream', (stream) => {
pull(stream, stream)
})
})

tcpListener.listen(mh, () => {
const dialer = muxer.dial(tcp.dial(mh, () => {
tcpListener.close()
}))

const s1 = dialer.newStream(() => {
pull(
s1,
pull.onEnd((err) => {
expect(err).to.exist.mark()
})
)

const s2 = dialer.newStream(() => {
pull(
s2,
pull.onEnd((err) => {
expect(err).to.exist.mark()
})
)
})
})
})
})

it('closing one of the muxed streams doesn\'t close others', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])

expect(6).checks(done)

const conns = []

listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
pull(stream, stream)
})

for (let i = 0; i < 5; i++) {
conns.push(dialer.newStream())
}

conns.forEach((conn, i) => {
if (i === 2) {
closeAndWait(conn)
} else {
pull(
conn,
pull.onEnd(() => {
throw new Error('should not end')
})
)
}
})
})

it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])

expect(15).checks(done)

const conns = []
const count = []
for (let i = 0; i < 5; i++) {
count.push(i)
}

series(count.map((i) => (cb) => {
parallel([
(cb) => listener.once('stream', (stream) => {
console.log('pipe')
expect(stream).to.exist.mark()
pull(stream, stream)
cb()
}),
(cb) => conns.push(dialer.newStream(cb))
], cb)
}), (err) => {
if (err) return done(err)

conns.forEach((conn, i) => {
pull(
pull.values([Buffer('hello')]),
pull.asyncMap((val, cb) => {
setTimeout(() => {
cb(null, val)
}, i * 10)
}),
pull.through((val) => console.log('send', val)),
conn,
pull.through((val) => console.log('recv', val)),
pull.collect((err, data) => {
console.log('end', i)
expect(err).to.not.exist.mark()
expect(data).to.be.eql([Buffer('hello')]).mark()
})
)
})

listener.on('close', () => {
console.log('closed listener')
})

dialer.end(() => {
console.log('CLOSED')
})
})
})
})
}
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
'use strict'

const baseTest = require('./base-test')
const closeTest = require('./close-test')
const stressTest = require('./stress-test')
const megaStressTest = require('./mega-stress-test')

module.exports = (common) => {
describe('interface-stream-muxer', () => {
baseTest(common)
closeTest(common)
stressTest(common)
megaStressTest(common)
})
Expand Down

0 comments on commit c06da3b

Please sign in to comment.