diff --git a/package.json b/package.json index 24cca718ad..0362dfbe6d 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "test:unit:node:gateway": "aegir test -t node -f test/gateway/index.js", "test:unit:node:cli": "aegir test -t node -f test/cli/index.js", "test:unit:browser": "aegir test -t browser --no-cors", - "test:interop": "IPFS_TEST=interop aegir test -t node -t browser -f test/interop", + "test:interop": "IPFS_TEST=interop aegir test -t node -f test/interop", "test:interop:node": "IPFS_TEST=interop aegir test -t node -f test/interop/node.js", "test:interop:browser": "IPFS_TEST=interop aegir test -t browser -f test/interop/browser.js", "test:bootstrapers": "IPFS_TEST=bootstrapers aegir test -t browser -f test/bootstrapers.js", @@ -63,7 +63,7 @@ }, "homepage": "https://github.com/ipfs/js-ipfs#readme", "devDependencies": { - "aegir": "^12.1.3", + "aegir": "^12.2.0", "buffer-loader": "0.0.1", "chai": "^4.1.2", "delay": "^2.0.0", @@ -76,7 +76,7 @@ "form-data": "^2.3.1", "hat": "0.0.3", "interface-ipfs-core": "~0.36.7", - "ipfsd-ctl": "~0.24.1", + "ipfsd-ctl": "~0.25.1", "left-pad": "^1.2.0", "lodash": "^4.17.4", "mocha": "^4.0.1", @@ -92,6 +92,7 @@ }, "dependencies": { "async": "^2.6.0", + "binary-querystring": "~0.1.2", "bl": "^1.2.1", "boom": "^7.1.1", "bs58": "^4.0.1", @@ -106,7 +107,7 @@ "hapi": "^16.6.2", "hapi-set-header": "^1.0.2", "hoek": "^5.0.2", - "ipfs-api": "^17.1.0", + "ipfs-api": "^17.1.1", "ipfs-bitswap": "~0.17.4", "ipfs-block": "~0.6.1", "ipfs-block-service": "~0.13.0", diff --git a/src/core/components/no-floodsub.js b/src/core/components/no-floodsub.js new file mode 100644 index 0000000000..95db571f5f --- /dev/null +++ b/src/core/components/no-floodsub.js @@ -0,0 +1,24 @@ +'use strict' + +const EventEmitter = require('events') + +function fail () { + throw new Error('The daemon must be run with \'--enable-pubsub-experiment\'') +} + +class NoFloodSub extends EventEmitter { + constructor () { + super() + + this.peers = new Map() + this.subscriptions = new Set() + } + + start (callback) { callback() } + stop (callback) { callback() } + publish () { fail() } + subscribe () { fail() } + unsubscribe () { fail() } +} + +module.exports = NoFloodSub diff --git a/src/core/components/start.js b/src/core/components/start.js index 54ff3c9833..9575c3e3c6 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -3,6 +3,7 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') const FloodSub = require('libp2p-floodsub') +const NoFloodSub = require('./no-floodsub') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') @@ -50,12 +51,10 @@ module.exports = (self) => { self._bitswap.start() self._blockService.setExchange(self._bitswap) - if (self._options.EXPERIMENTAL.pubsub) { - self._pubsub = new FloodSub(self._libp2pNode) - self._pubsub.start(done) - } else { - done() - } + self._pubsub = self._options.EXPERIMENTAL.pubsub + ? new FloodSub(self._libp2pNode) + : new NoFloodSub() + self._pubsub.start(done) }) }) } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 3a2dd8c04f..6ac0dc91a3 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -31,13 +31,7 @@ module.exports = (self) => { self._bitswap.stop() series([ - (cb) => { - if (self._options.EXPERIMENTAL.pubsub) { - self._pubsub.stop(cb) - } else { - cb() - } - }, + (cb) => self._pubsub.stop(cb), (cb) => self.libp2p.stop(cb), (cb) => self._repo.close(cb) ], done) diff --git a/src/http/api/resources/pubsub.js b/src/http/api/resources/pubsub.js index 6aa1ad6f82..ce37d8f2d1 100644 --- a/src/http/api/resources/pubsub.js +++ b/src/http/api/resources/pubsub.js @@ -2,6 +2,7 @@ const PassThrough = require('stream').PassThrough const bs58 = require('bs58') +const binaryQueryString = require('binary-querystring') exports = module.exports @@ -48,6 +49,7 @@ exports.subscribe = { reply(res) .header('X-Chunked-Output', '1') + .header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975 .header('content-type', 'application/json') }) } @@ -57,7 +59,9 @@ exports.publish = { handler: (request, reply) => { const arg = request.query.arg const topic = arg[0] - const buf = arg[1] + + const rawArgs = binaryQueryString(request.url.search) + const buf = rawArgs.arg && rawArgs.arg[1] const ipfs = request.server.app.ipfs @@ -69,7 +73,7 @@ exports.publish = { return reply(new Error('Missing buf')) } - ipfs.pubsub.publish(topic, Buffer.from(String(buf)), (err) => { + ipfs.pubsub.publish(topic, buf, (err) => { if (err) { return reply(new Error(`Failed to publish to topic ${topic}: ${err}`)) } diff --git a/test/cli/pubsub.js b/test/cli/pubsub.js index 703b83a5b0..49c6786093 100644 --- a/test/cli/pubsub.js +++ b/test/cli/pubsub.js @@ -14,7 +14,7 @@ const createTempNode = '' const repoPath = require('./index').repoPath const ipfs = require('../utils/ipfs-exec')(repoPath) -describe.skip('pubsub', () => { +describe('pubsub', () => { const topicA = 'nonscentsA' const topicB = 'nonscentsB' const topicC = 'nonscentsC' diff --git a/test/fixtures/go-ipfs-repo/version b/test/fixtures/go-ipfs-repo/version index 1e8b314962..62f9457511 100644 --- a/test/fixtures/go-ipfs-repo/version +++ b/test/fixtures/go-ipfs-repo/version @@ -1 +1 @@ -6 +6 \ No newline at end of file diff --git a/test/http-api/index.js b/test/http-api/index.js index e2c7f34ce3..3508b1bb80 100644 --- a/test/http-api/index.js +++ b/test/http-api/index.js @@ -19,7 +19,10 @@ describe('HTTP API', () => { let http = {} before((done) => { - http.api = new API(repoTests) + const options = { + enablePubsubExperiment: true + } + http.api = new API(repoTests, null, options) ncp(repoExample, repoTests, (err) => { expect(err).to.not.exist() diff --git a/test/http-api/interface/pubsub.js b/test/http-api/interface/pubsub.js index 968227c83b..ab474fac9f 100644 --- a/test/http-api/interface/pubsub.js +++ b/test/http-api/interface/pubsub.js @@ -2,8 +2,6 @@ 'use strict' -// TODO needs: https://github.com/ipfs/js-ipfs-api/pull/493 -/* const test = require('interface-ipfs-core') const FactoryClient = require('./../../utils/ipfs-factory-daemon') @@ -20,4 +18,3 @@ const common = { } test.pubsub(common) -*/ diff --git a/test/http-api/spec/pubsub.js b/test/http-api/spec/pubsub.js index 245b6d153c..9c69427246 100644 --- a/test/http-api/spec/pubsub.js +++ b/test/http-api/spec/pubsub.js @@ -6,35 +6,17 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const createTempNode = '' -// TODO migrate to use ipfs-factory-daemon module.exports = (http) => { - describe.skip('/pubsub', () => { + describe('/pubsub', () => { let api - let tmpNode const buf = Buffer.from('some message') const topic = 'nonScents' const topicNotSubscribed = 'somethingRandom' - before((done) => { + before(() => { api = http.api.server.select('API') - - createTempNode(47, (err, _ipfs) => { - expect(err).to.not.exist() - tmpNode = _ipfs - tmpNode.goOnline((err) => { - expect(err).to.not.exist() - done() - }) - }) - }) - - after((done) => { - setTimeout(() => { - tmpNode.goOffline(done) - }, 1000) }) describe('/sub', () => { diff --git a/test/interop/node.js b/test/interop/node.js index 7e4a513445..a2e415b87a 100644 --- a/test/interop/node.js +++ b/test/interop/node.js @@ -6,3 +6,4 @@ require('./exchange-files') require('./circuit-relay') require('./kad-dht') require('./pubsub') +require('./pubsub-go') diff --git a/test/interop/pubsub.js b/test/interop/pubsub.js index b76f21967f..a69aacb37e 100644 --- a/test/interop/pubsub.js +++ b/test/interop/pubsub.js @@ -8,10 +8,29 @@ chai.use(dirtyChai) const series = require('async/series') const parallel = require('async/parallel') -const GODaemon = require('../utils/interop-daemon-spawner/go') +const GoDaemon = require('../utils/interop-daemon-spawner/go') const JSDaemon = require('../utils/interop-daemon-spawner/js') -describe('pubsub', () => { +/* + * Wait for a condition to become true. When its true, callback is called. + */ +function waitFor (predicate, callback) { + const ttl = Date.now() + (2 * 1000) + const self = setInterval(() => { + if (predicate()) { + clearInterval(self) + return callback() + } + if (Date.now() > ttl) { + clearInterval(self) + return callback(new Error('waitFor time expired')) + } + }, 500) +} + +describe('pubsub', function () { + this.timeout(5 * 1000) + let jsD let goD let jsId @@ -20,7 +39,7 @@ describe('pubsub', () => { before(function (done) { this.timeout(50 * 1000) - goD = new GODaemon({ + goD = new GoDaemon({ disposable: true, init: true, flags: ['--enable-pubsub-experiment'] @@ -33,48 +52,296 @@ describe('pubsub', () => { ], (done)) }) - after((done) => { - series([ + after(function (done) { + this.timeout(50 * 1000) + + parallel([ (cb) => goD.stop(cb), (cb) => jsD.stop(cb) ], done) }) it('make connections', (done) => { - parallel([ + series([ (cb) => jsD.api.id(cb), (cb) => goD.api.id(cb) ], (err, ids) => { expect(err).to.not.exist() - jsId = ids[0].ID - goId = ids[0].ID + jsId = ids[0].id + goId = ids[1].id - console.log('jsId:', jsId) - console.log('goId:', goId) + const jsLocalAddr = ids[0].addresses.find(a => a.includes('127.0.0.1')) + const goLocalAddr = ids[1].addresses.find(a => a.includes('127.0.0.1')) parallel([ - (cb) => jsD.api.swarm.connect(ids[1].addresses[0], cb), - (cb) => goD.api.swarm.connect(ids[0].addresses[0], cb) + (cb) => jsD.api.swarm.connect(goLocalAddr, cb), + (cb) => goD.api.swarm.connect(jsLocalAddr, cb), + (cb) => setTimeout(() => { + cb() + }, 1000) ], done) }) }) - it.skip('publish from JS, subscribe on Go', (done) => { - // TODO write this test + describe('ascii data', () => { + const data = Buffer.from('hello world') + + it('publish from Go, subscribe on Go', (done) => { + const topic = 'pubsub-go-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on JS', (done) => { + const topic = 'pubsub-js-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on Go', (done) => { + const topic = 'pubsub-js-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from Go, subscribe on JS', (done) => { + const topic = 'pubsub-go-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb), + ], done) + }) }) - it.skip('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-go-js' - const data = Buffer.from('hello world') + describe('non-ascii data', () => { + const data = Buffer.from('你好世界') - function checkMessage () { - console.log('check message', arguments) - } + it('publish from Go, subscribe on Go', (done) => { + const topic = 'pubsub-non-ascii-go-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on JS', (done) => { + const topic = 'pubsub-non-ascii-js-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on Go', (done) => { + const topic = 'pubsub-non-ascii-js-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from Go, subscribe on JS', (done) => { + const topic = 'pubsub-non-ascii-go-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb), + ], done) + }) + }) + + describe('binary data', () => { + const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') + + it('publish from Go, subscribe on Go', (done) => { + const topic = 'pubsub-binary-go-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString('hex')).to.equal(data.toString('hex')) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from Go, subscribe on JS', (done) => { + const topic = 'pubsub-binary-go-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString('hex')).to.equal(data.toString('hex')) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', goId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => goD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on Go', (done) => { + const topic = 'pubsub-binary-js-go' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString('hex')).to.equal(data.toString('hex')) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) + + it('publish from JS, subscribe on JS', (done) => { + const topic = 'pubsub-binary-js-js' + let n = 0 + + function checkMessage (msg) { + ++n + expect(msg.data.toString('hex')).to.equal(data.toString('hex')) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', jsId) + } + + series([ + (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => setTimeout(() => { cb() }, 500), + (cb) => jsD.api.pubsub.publish(topic, data, cb), + (cb) => waitFor(() => n === 1, cb) + ], done) + }) - series([ - cb => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - cb => goD.api.pubsub.publish(topic, data, cb) - ], done) }) }) diff --git a/test/interop/repo.js b/test/interop/repo.js index bd981d0ff9..4956b0d543 100644 --- a/test/interop/repo.js +++ b/test/interop/repo.js @@ -24,7 +24,7 @@ function catAndCheck (daemon, hash, data, callback) { }) } -describe('repo', () => { +describe.only('repo', () => { it('read repo: go -> js', (done) => { const dir = os.tmpdir() + '/' + Math.ceil(Math.random() * 10000) const data = crypto.randomBytes(1024 * 5) diff --git a/test/utils/interop-daemon-spawner/go.js b/test/utils/interop-daemon-spawner/go.js index 92a04ca99e..8010724d81 100644 --- a/test/utils/interop-daemon-spawner/go.js +++ b/test/utils/interop-daemon-spawner/go.js @@ -40,8 +40,7 @@ class GoDaemon { this.node = node this.node.setConfig('Bootstrap', '[]', cb) }, - (res, cb) => this.node.startDaemon(cb), - // (res, cb) => this.node.startDaemon(this.flags, cb), + (res, cb) => this.node.startDaemon(this.flags, cb), (api, cb) => { this.api = api diff --git a/test/utils/interop-daemon-spawner/js.js b/test/utils/interop-daemon-spawner/js.js index e96850c82b..bd376ff892 100644 --- a/test/utils/interop-daemon-spawner/js.js +++ b/test/utils/interop-daemon-spawner/js.js @@ -37,14 +37,17 @@ class JsDaemon extends EventEmitter { this.path = opts.path || tmpDir() this._started = false + const extras = { + enablePubsubExperiment: true + } if (this.init) { const p = portConfig(this.port) this.node = new HttpApi(this.path, { Bootstrap: [], Addresses: p - }) + }, extras) } else { - this.node = new HttpApi(this.path) + this.node = new HttpApi(this.path, null, extras) } this.node.start(this.init, (err) => { diff --git a/test/utils/interop-daemon-spawner/util.js b/test/utils/interop-daemon-spawner/util.js index 256f857f0c..c0ca77a338 100644 --- a/test/utils/interop-daemon-spawner/util.js +++ b/test/utils/interop-daemon-spawner/util.js @@ -7,7 +7,7 @@ const path = require('path') exports.tmpDir = (prefix) => { return path.join( os.tmpdir(), - prefix || 'tmp', + prefix || 'js-ipfs-interop', crypto.randomBytes(32).toString('hex') ) } diff --git a/test/utils/ipfs-factory-daemon/index.js b/test/utils/ipfs-factory-daemon/index.js index 9c7a61ce73..6b6ab1e1ba 100644 --- a/test/utils/ipfs-factory-daemon/index.js +++ b/test/utils/ipfs-factory-daemon/index.js @@ -52,7 +52,9 @@ class Factory { }) }, (cb) => { - daemon = new HttpApi(repoPath, config) + daemon = new HttpApi(repoPath, config, { + enablePubsubExperiment: true + }) daemon.repoPath = repoPath this.daemonsSpawned.push(daemon)