diff --git a/gulpfile.js b/gulpfile.js index 28de380..a3457e2 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -2,6 +2,8 @@ const gulp = require('gulp') const multiaddr = require('multiaddr') +const pull = require('pull-stream') + const WS = require('./src') let listener @@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => { const ws = new WS() const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') listener = ws.createListener((conn) => { - conn.pipe(conn) + pull(conn, conn) }) listener.listen(ma, done) }) diff --git a/package.json b/package.json index 43f409b..ecaf025 100644 --- a/package.json +++ b/package.json @@ -35,20 +35,20 @@ "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { "detect-node": "^2.0.3", - "interface-connection": "^0.1.8", + "interface-connection": "^0.2.1", "lodash.contains": "^2.4.3", "mafmt": "^2.1.1", - "run-parallel": "^1.1.6", - "simple-websocket": "^4.1.0", - "simple-websocket-server": "^0.1.4" + "pull-ws": "^3.2.3" }, "devDependencies": { "aegir": "^6.0.1", - "multiaddr": "^2.0.2", "chai": "^3.5.0", "gulp": "^3.9.1", - "interface-transport": "^0.2.0", - "pre-commit": "^1.1.3" + "interface-transport": "^0.3.1", + "multiaddr": "^2.0.2", + "pre-commit": "^1.1.3", + "pull-goodbye": "0.0.1", + "pull-stream": "^3.4.3" }, "contributors": [ "David Dias ", diff --git a/src/index.js b/src/index.js index a663e6e..677dfdf 100644 --- a/src/index.js +++ b/src/index.js @@ -1,147 +1,55 @@ 'use strict' -const debug = require('debug') -const log = debug('libp2p:websockets') -const SW = require('simple-websocket') -const isNode = require('detect-node') -let SWS -if (isNode) { - SWS = require('simple-websocket-server') -} else { - SWS = {} -} +const connect = require('pull-ws/client') const mafmt = require('mafmt') const contains = require('lodash.contains') const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:websockets:dialer') -const CLOSE_TIMEOUT = 2000 -// const IPFS_CODE = 421 - -exports = module.exports = WebSockets - -function WebSockets () { - if (!(this instanceof WebSockets)) { - return new WebSockets() - } +const createListener = require('./listener') - this.dial = function (ma, options, callback) { +module.exports = class WebSockets { + dial (ma, options, callback) { if (typeof options === 'function') { callback = options options = {} } if (!callback) { - callback = function noop () {} + callback = () => {} } const maOpts = ma.toOptions() - const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port) - - const conn = new Connection(socket) - - socket.on('timeout', () => { - conn.emit('timeout') - }) - - socket.on('error', (err) => { - callback(err) - conn.emit('error', err) - }) - - socket.on('connect', () => { - callback(null, conn) - conn.emit('connect') + const url = `ws://${maOpts.host}:${maOpts.port}` + log('dialing %s', url) + const socket = connect(url, { + binary: true, + onConnect: callback }) - conn.getObservedAddrs = (cb) => { - return cb(null, [ma]) - } + const conn = new Connection(socket) + conn.getObservedAddrs = (cb) => cb(null, [ma]) + conn.close = (cb) => socket.close(cb) return conn } - this.createListener = (options, handler) => { + createListener (options, handler) { if (typeof options === 'function') { handler = options options = {} } - const listener = SWS.createServer((socket) => { - const conn = new Connection(socket) - - conn.getObservedAddrs = (cb) => { - // TODO research if we can reuse the address in anyway - return cb(null, []) - } - handler(conn) - }) - - let listeningMultiaddr - - listener._listen = listener.listen - listener.listen = (ma, callback) => { - if (!callback) { - callback = function noop () {} - } - - listeningMultiaddr = ma - - if (contains(ma.protoNames(), 'ipfs')) { - ma = ma.decapsulate('ipfs') - } - - listener._listen(ma.toOptions(), callback) - } - - listener._close = listener.close - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = { timeout: CLOSE_TIMEOUT } - } - if (!callback) { callback = function noop () {} } - if (!options) { options = { timeout: CLOSE_TIMEOUT } } - - let closed = false - listener.once('close', () => { - closed = true - }) - listener._close(callback) - setTimeout(() => { - if (closed) { - return - } - log('unable to close graciously, destroying conns') - Object.keys(listener.__connections).forEach((key) => { - log('destroying %s', key) - listener.__connections[key].destroy() - }) - }, options.timeout || CLOSE_TIMEOUT) - } - - // Keep track of open connections to destroy in case of timeout - listener.__connections = {} - listener.on('connection', (socket) => { - const key = (~~(Math.random() * 1e9)).toString(36) + Date.now() - listener.__connections[key] = socket - - socket.on('close', () => { - delete listener.__connections[key] - }) - }) - - listener.getAddrs = (callback) => { - callback(null, [listeningMultiaddr]) - } - - return listener + return createListener(options, handler) } - this.filter = (multiaddrs) => { + filter (multiaddrs) { if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] } + return multiaddrs.filter((ma) => { if (contains(ma.protoNames(), 'ipfs')) { ma = ma.decapsulate('ipfs') diff --git a/src/listener.js b/src/listener.js new file mode 100644 index 0000000..baa4337 --- /dev/null +++ b/src/listener.js @@ -0,0 +1,46 @@ +'use strict' + +const isNode = require('detect-node') +const Connection = require('interface-connection').Connection +const contains = require('lodash.contains') + +// const IPFS_CODE = 421 + +let createServer + +if (isNode) { + createServer = require('pull-ws/server') +} else { + createServer = () => {} +} + +module.exports = (options, handler) => { + const listener = createServer((socket) => { + socket.getObservedAddrs = (cb) => { + // TODO research if we can reuse the address in anyway + return cb(null, []) + } + + handler(new Connection(socket)) + }) + + let listeningMultiaddr + + listener._listen = listener.listen + listener.listen = (ma, cb) => { + cb = cb || (() => {}) + listeningMultiaddr = ma + + if (contains(ma.protoNames(), 'ipfs')) { + ma = ma.decapsulate('ipfs') + } + + listener._listen(ma.toOptions(), cb) + } + + listener.getAddrs = (cb) => { + cb(null, [listeningMultiaddr]) + } + + return listener +} diff --git a/test/browser.js b/test/browser.js index dd23f14..a6aa03c 100644 --- a/test/browser.js +++ b/test/browser.js @@ -3,74 +3,67 @@ const expect = require('chai').expect const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + const WS = require('../src') describe('libp2p-websockets', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') let ws + let conn - it('create', (done) => { + beforeEach((done) => { ws = new WS() expect(ws).to.exist - done() + conn = ws.dial(ma, done) }) it('echo', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const conn = ws.dial(ma) const message = 'Hello World!' - conn.write(message) - conn.on('data', (data) => { - expect(data.toString()).to.equal(message) - conn.end() - done() - }) - }) - describe('stress', () => { - it('one big write', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const conn = ws.dial(mh) - const message = new Buffer(1000000).fill('a').toString('hex') - conn.write(message) - conn.on('data', (data) => { - expect(data.toString()).to.equal(message) - conn.end() + const s = goodbye({ + source: pull.values([message]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist + expect(results).to.be.eql([message]) done() }) }) - it('many writes in 2 batches', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const conn = ws.dial(mh) - let expected = '' - let counter = 0 - while (++counter < 10000) { - conn.write(`${counter} `) - expected += `${counter} ` - } - - setTimeout(() => { - while (++counter < 20000) { - conn.write(`${counter} `) - expected += `${counter} ` - } + pull(s, conn, s) + }) - conn.write('STOP') - }, 1000) + describe('stress', () => { + it('one big write', (done) => { + const rawMessage = new Buffer(1000000).fill('a') - let result = '' - conn.on('data', (data) => { - if (data.toString() === 'STOP') { - conn.end() - return - } - result += data.toString() + const s = goodbye({ + source: pull.values([rawMessage]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist + expect(results).to.be.eql([rawMessage]) + done() + }) }) + pull(s, conn, s) + }) - conn.on('end', () => { - expect(result).to.equal(expected) - done() + it('many writes', (done) => { + const s = goodbye({ + source: pull( + pull.infinite(), + pull.take(1000), + pull.map((val) => Buffer(val.toString())) + ), + sink: pull.collect((err, result) => { + expect(err).to.not.exist + expect(result).to.have.length(1000) + done() + }) }) + + pull(s, conn, s) }) }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js new file mode 100644 index 0000000..07c850d --- /dev/null +++ b/test/compliance.node.js @@ -0,0 +1,23 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') +const Ws = require('../src') + +describe('compliance', () => { + tests({ + setup (cb) { + let ws = new Ws() + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9092/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9093/ws') + ] + cb(null, ws, addrs) + }, + teardown (cb) { + cb() + } + }) +}) diff --git a/test/node.js b/test/node.js index 655721f..9bf9aa3 100644 --- a/test/node.js +++ b/test/node.js @@ -3,19 +3,17 @@ const expect = require('chai').expect const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + const WS = require('../src') +require('./compliance.node') + describe('instantiate the transport', () => { - it('create', (done) => { + it('create', () => { const ws = new WS() expect(ws).to.exist - done() - }) - - it('create without new', (done) => { - const ws = WS() - expect(ws).to.exist - done() }) }) @@ -122,7 +120,7 @@ describe('dial', () => { beforeEach((done) => { ws = new WS() listener = ws.createListener((conn) => { - conn.pipe(conn) + pull(conn, conn) }) listener.listen(ma, done) }) @@ -133,12 +131,18 @@ describe('dial', () => { it('dial on IPv4', (done) => { const conn = ws.dial(ma) - conn.write('hey') - conn.end() - conn.on('data', (chunk) => { - expect(chunk.toString()).to.equal('hey') + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist + + expect(result).to.be.eql(['hey']) + done() + }) }) - conn.on('end', done) + + pull(s, conn, s) }) it.skip('dial on IPv6', (done) => { @@ -148,12 +152,18 @@ describe('dial', () => { it('dial on IPv4 with IPFS Id', (done) => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const conn = ws.dial(ma) - conn.write('hey') - conn.end() - conn.on('data', (chunk) => { - expect(chunk.toString()).to.equal('hey') + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist + + expect(result).to.be.eql(['hey']) + done() + }) }) - conn.on('end', done) + + pull(s, conn, s) }) }) @@ -204,13 +214,17 @@ describe('valid Connection', () => { dialerObsAddrs = addrs }) - conn.pipe(conn) + pull(conn, conn) }) listener.listen(ma, () => { const conn = ws.dial(ma) - conn.on('end', onEnd) + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) function onEnd () { conn.getObservedAddrs((err, addrs) => { @@ -218,7 +232,6 @@ describe('valid Connection', () => { listenerObsAddrs = addrs listener.close(onClose) - function onClose () { expect(listenerObsAddrs[0]).to.deep.equal(ma) expect(dialerObsAddrs.length).to.equal(0) @@ -226,8 +239,6 @@ describe('valid Connection', () => { } }) } - conn.resume() - conn.end() }) }) @@ -241,13 +252,17 @@ describe('valid Connection', () => { expect(err).to.exist }) - conn.pipe(conn) + pull(conn, conn) }) listener.listen(ma, () => { const conn = ws.dial(ma) - conn.on('end', onEnd) + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) function onEnd () { conn.getPeerInfo((err, peerInfo) => { @@ -255,8 +270,6 @@ describe('valid Connection', () => { listener.close(done) }) } - conn.resume() - conn.end() }) }) @@ -272,7 +285,7 @@ describe('valid Connection', () => { expect(peerInfo).to.equal('a') }) - conn.pipe(conn) + pull(conn, conn) }) listener.listen(ma, onListen) @@ -280,15 +293,19 @@ describe('valid Connection', () => { const conn = ws.dial(ma) conn.setPeerInfo('b') - conn.on('end', () => { + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { conn.getPeerInfo((err, peerInfo) => { expect(err).to.not.exist expect(peerInfo).to.equal('b') listener.close(done) }) - }) - conn.resume() - conn.end() + } } }) })