diff --git a/API/pubsub/README.md b/API/pubsub/README.md index 0b6cf78f..8bb5a6b3 100644 --- a/API/pubsub/README.md +++ b/API/pubsub/README.md @@ -3,38 +3,48 @@ pubsub API #### `pubsub.subscribe` -> Subscribe to an IPFS Topic +> Subscribe to a pubsub topic. ##### `Go` **WIP** -##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, callback) +##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, handler, callback) -- `topic` - type: String -- `options` - type: Object, optional, might contain the following properties: - - `discover`: type: Boolean - Will use the DHT to find - -`callback` must follow `function (err, subscription) {}` where Subscription is a Node.js Stream in Object mode, emiting a `data` event for each new message on the subscribed topic.`err` is an error if the operation was not successful. - -`subscription` has a `.cancel` event in order to cancel the subscription. +- `topic: string` +- `options: Object` - (Optional), might contain the following properties: + - `discover`: type: Boolean - Will use the DHT to find other peers. +- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicCIDs: Array}`. +- `callback: (Error) => ()` (Optional) Called once the subscription is established. If no `callback` is passed, a [promise][] is returned. > _In the future, topic can also be type of TopicDescriptor (https://github.com/libp2p/pubsub-notes/blob/master/flooding/flooding.proto#L23). However, for now, only strings are supported._ +#### `pubsub.unsubscribe` + +> Unsubscribes from a pubsub topic. + +##### `Go` **WIP** + +##### `JavaScript` - `ipfs.pubsub.unsubscribe(topic, handler)` + +- `topic: string` - The topic to unsubscribe from +- `handler: (msg) => ()` - The handler to remove. + +This works like `EventEmitter.removeListener`, as that only the `handler` passed to a `subscribe` call before is removed from listening. The underlying subscription will only be canceled once all listeners for a topic have been removed. + #### `pubsub.publish` -> Publish a data message to a pubsub topic +> Publish a data message to a pubsub topic. ##### `Go` **WIP** ##### `JavaScript` - ipfs.pubsub.publish(topic, data, callback) -- `topic` - type: String -- `data` - type: Buffer +- `topic: string` +- `data: buffer` - The actual message to send +- `callback: (Error) => ()` - Calls back with an error or nothing if the publish was successfull. -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. - -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. #### `pubsub.ls` @@ -44,9 +54,10 @@ If no `callback` is passed, a [promise][] is returned. ##### `JavaScript` - ipfs.pubsub.ls(topic, callback) -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. +- `topic: string` +- `callback: (Error, Array>) => ()` - Calls back with an error or a list of topicCIDs that this peer is subscribed to. -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. #### `pubsub.peers` @@ -56,8 +67,7 @@ If no `callback` is passed, a [promise][] is returned. ##### `JavaScript` - ipfs.pubsub.peers(topic, callback) -- `topic` - type: String - -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. +- `topic: string` +- `callback: (Error, Array>) => ()` - Calls back with an error or a list of peer ids subscribed to the `topic`. -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. diff --git a/src/object.js b/src/object.js index 079c1fca..cd130456 100644 --- a/src/object.js +++ b/src/object.js @@ -42,7 +42,7 @@ module.exports = (common) => { }) }) - it.skip('template unixfs-dir', (done) => { + it('template unixfs-dir', (done) => { ipfs.object.new('unixfs-dir', (err, node) => { expect(err).to.not.exist const nodeJSON = node.toJSON() diff --git a/src/pubsub.js b/src/pubsub.js index aa407e41..3829dc23 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -4,6 +4,55 @@ const expect = require('chai').expect const series = require('async/series') +const waterfall = require('async/waterfall') +const parallel = require('async/parallel') +const whilst = require('async/whilst') +const each = require('async/each') + +function waitForPeers (ipfs, topic, peersToWait, callback) { + const i = setInterval(() => { + ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + return callback(err) + } + + const missingPeers = peersToWait + .map((e) => peers.includes(e)) + .filter((e) => !e) + + if (missingPeers.length === 0) { + clearInterval(i) + callback() + } + }) + }, 500) +} + +function spawnWithId (factory, callback) { + waterfall([ + (cb) => factory.spawnNode(cb), + (node, cb) => node.id((err, res) => { + if (err) { + return cb(err) + } + node.peerId = res + cb(null, node) + }) + ], callback) +} + +function makeCheck (n, done) { + let i = 0 + return (err) => { + if (err) { + return done(err) + } + + if (++i === n) { + done() + } + } +} module.exports = (common) => { describe('.pubsub', () => { @@ -12,35 +61,28 @@ module.exports = (common) => { describe('callback API', () => { let ipfs1 let ipfs2 + let ipfs3 before((done) => { - // CI takes longer to instantiate the daemon, - // so we need to increase the timeout for the - // before step common.setup((err, factory) => { - expect(err).to.not.exist + if (err) { + return done(err) + } + series([ - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs1 = node - ipfs1.id().then((res) => { - ipfs1.peerId = res.id - cb() - }) - }) - }, - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs2 = node - ipfs2.id().then((res) => { - ipfs2.peerId = res.id - cb() - }) - }) + (cb) => spawnWithId(factory, cb), + (cb) => spawnWithId(factory, cb), + (cb) => spawnWithId(factory, cb) + ], (err, nodes) => { + if (err) { + return done(err) } - ], done) + + ipfs1 = nodes[0] + ipfs2 = nodes[1] + ipfs3 = nodes[2] + done() + }) }) }) @@ -50,8 +92,11 @@ module.exports = (common) => { describe('single node', () => { describe('.publish', () => { - it('message from string', (done) => { - ipfs1.pubsub.publish(topic, 'hello friend', done) + it('errors on string messags', (done) => { + ipfs1.pubsub.publish(topic, 'hello friend', (err) => { + expect(err).to.exist + done() + }) }) it('message from buffer', (done) => { @@ -61,55 +106,84 @@ module.exports = (common) => { describe('.subscribe', () => { it('to one topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + const check = makeCheck(2, done) - subscription.on('data', (msg) => { - expect(msg.data.toString()).to.equal('hi') - subscription.cancel(done) - }) + const handler = (msg) => { + expect(msg.data.toString()).to.equal('hi') + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicCIDs').eql([topic]) + // TODO: broken https://github.com/ipfs/go-ipfs/issues/3522 + // expect(msg).to.have.property('from', ipfs1.peerId.id) + + ipfs1.pubsub.unsubscribe(topic, handler) - ipfs1.pubsub.publish(topic, 'hi', (err) => { + ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist + expect(topics).to.be.empty + check() }) - }) - }) - - it('errors on double subscription', (done) => { - series([ - (cb) => ipfs1.pubsub.subscribe(topic, cb), - (cb) => ipfs1.pubsub.subscribe(topic, cb) - ], (err, subs) => { - expect(err).to.exist - expect(err.toString()) - .to.eql(`Error: Already subscribed to '${topic}'`) - subs[0].cancel(done) - }) - }) + } - it('discover options', (done) => { - ipfs1.pubsub.subscribe(topic, { - discover: true - }, (err, subscription) => { + ipfs1.pubsub.subscribe(topic, handler, (err) => { expect(err).to.not.exist - subscription.cancel(done) + ipfs1.pubsub.publish(topic, new Buffer('hi'), check) }) }) - }) - describe('subscription', () => { - it('.cancel and wait for callback', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + it('attaches multiple event listeners', (done) => { + const check = makeCheck(3, done) + const handler1 = (msg) => { + expect(msg.data.toString()).to.be.eql('hello') + + ipfs1.pubsub.unsubscribe(topic, handler1) + + series([ + (cb) => ipfs1.pubsub.ls(cb), + (cb) => { + ipfs1.pubsub.unsubscribe(topic, handler2) + cb() + }, + (cb) => ipfs1.pubsub.ls(cb) + ], (err, res) => { + expect(err).to.not.exist + + // Still subscribed as there is one listener left + expect(res[0]).to.be.eql([topic]) + // Now all listeners are gone no subscription anymore + expect(res[2]).to.be.eql([]) + check() + }) + } + + const handler2 = (msg) => { + expect(msg.data.toString()).to.be.eql('hello') + check() + } + + parallel([ + (cb) => ipfs1.pubsub.subscribe(topic, handler1, cb), + (cb) => ipfs1.pubsub.subscribe(topic, handler2, cb) + ], (err) => { expect(err).to.not.exist - subscription.cancel(done) + ipfs1.pubsub.publish(topic, new Buffer('hello'), check) }) }) - it('.cancel and wait for end event', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + it('discover options', (done) => { + const check = makeCheck(2, done) + + const handler = (msg) => { + expect(msg.data.toString()).to.be.eql('hi') + ipfs1.pubsub.unsubscribe(topic, handler) + check() + } + + ipfs1.pubsub.subscribe(topic, { + discover: true + }, handler, (err) => { expect(err).to.not.exist - subscription.on('end', done) - subscription.cancel() + ipfs1.pubsub.publish(topic, new Buffer('hi'), check) }) }) }) @@ -117,122 +191,97 @@ module.exports = (common) => { describe('multiple nodes connected', () => { before((done) => { - ipfs2.id((err, id) => { - expect(err).to.not.exist - const ipfs2Addr = id.addresses[0] - ipfs1.swarm.connect(ipfs2Addr, (err) => { - expect(err).to.not.exist - done() - }) + parallel([ + (cb) => ipfs1.swarm.connect(ipfs2.peerId.addresses[0], cb), + (cb) => ipfs2.swarm.connect(ipfs3.peerId.addresses[0], cb), + (cb) => ipfs1.swarm.connect(ipfs3.peerId.addresses[0], cb) + ], (err) => { + if (err) { + return done(err) + } + // give some time to let everything connect + setTimeout(done, 300) }) }) - function waitForPeers (ipfs, peersToWait, callback) { - const i = setInterval(() => { - ipfs.pubsub.peers(topic, (err, peers) => { - if (err) { - return callback(err) - } - - const hasAllPeers = peersToWait - .map((e) => peers.includes(e)) - .filter((e) => e === false) - .length === 0 - - if (hasAllPeers) { - clearInterval(i) - callback() - } - }) - }, 1000) - } - describe('.peers', () => { - it('returns an error when not subscribed to a topic', (done) => { + it('does not error when not subscribed to a topic', (done) => { ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.exist - expect(err.toString()) - .to.eql(`Error: Not subscribed to '${topic}'`) + expect(err).to.not.exist + // Should be empty but as mentioned below go-ipfs returns more than it should + // expect(peers).to.be.empty + done() }) }) - // I don't understand the purpose of this test - it.skip('returns no peers within 10 seconds', (done) => { + it.skip("doesn't return extra peers", (done) => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + const sub1 = (msg) => {} + const sub2 = (msg) => {} + + const topicOther = topic + 'different topic' + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb) + ], (err) => { + expect(err).to.not.exist setTimeout(() => { ipfs1.pubsub.peers(topic, (err, peers) => { expect(err).to.not.exist - expect(peers.length).to.equal(0) - subscription.cancel(done) - }) - }, 10000) - }) - }) - it('doesn\'t return extra peers', (done) => { - // Currently go-ipfs returns peers that have not been - // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription1) => { - expect(err).to.not.exist - - ipfs2.pubsub.subscribe(topic + 'different topic', (err, subscription2) => { - expect(err).to.not.exist - - setTimeout(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.not.exist - expect(peers).to.have.length(0) - subscription1.cancel(() => subscription2.cancel(done)) - }) + expect(peers).to.be.empty + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topicOther, sub2) + done() }, 10000) }) }) }) - it('returns peers for a topic - one peer', (done) => { + it.skip('returns peers for a topic - one peer', (done) => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - const peersToWait = [ipfs2.peerId] - let subscription2 + const sub1 = (msg) => {} + const sub2 = (msg) => {} - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb) + ], (err) => { expect(err).to.not.exist - subscription2 = subscription + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + + done() }) + }) + + it('lists peers for a topic - multiple peers', (done) => { + const sub1 = (msg) => {} + const sub2 = (msg) => {} + const sub3 = (msg) => {} - ipfs2.pubsub.subscribe(topic, (err, subscription) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb), + (cb) => waitForPeers(ipfs1, topic, [ + ipfs2.peerId.id, + ipfs3.peerId.id + ], cb) + ], (err) => { expect(err).to.not.exist + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + ipfs3.pubsub.unsubscribe(topic, sub3) - const i = setInterval(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - if (err) { - expect(err).to.not.exist - done(err) - } - - const hasAllPeers = peersToWait - .map((e) => peers.indexOf(e) !== -1) - .filter((e) => e === false) - .length === 0 - - if (hasAllPeers) { - clearInterval(i) - expect(peers.length).to.equal(peersToWait.length) - subscription.cancel(() => subscription2.cancel(done)) - } - }) - }, 1000) + done() }) }) - - it.skip('lists peers for a topic - multiple peers', (done) => { - // TODO - }) }) describe('.ls', () => { @@ -245,29 +294,51 @@ module.exports = (common) => { }) it('list with 1 subscribed topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + const sub1 = (msg) => {} + + ipfs1.pubsub.subscribe(topic, sub1, (err) => { expect(err).to.not.exist ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist - expect(topics.length).to.equal(1) - expect(topics[0]).to.equal(topic) - subscription.cancel(done) + expect(topics).to.be.eql([topic]) + + ipfs1.pubsub.unsubscribe(topic, sub1) + done() }) }) }) - it('list with 3 subscribed topicss', (done) => { - const topics = ['one', 'two', 'three'] - series( - topics.map((t) => (cb) => ipfs1.pubsub.subscribe(t, cb)) - , (err, subs) => { + it('list with 3 subscribed topics', (done) => { + const topics = [{ + name: 'one', + handler () {} + }, { + name: 'two', + handler () {} + }, { + name: 'three', + handler () {} + }] + + each(topics, (t, cb) => { + ipfs1.pubsub.subscribe(t.name, t.handler, cb) + }, (err) => { expect(err).to.not.exist ipfs1.pubsub.ls((err, list) => { expect(err).to.not.exist - expect(list.length).to.equal(3) - expect(list).to.eql(topics) - series(subs.map((s) => (cb) => s.cancel(cb)), done) + + expect( + list.sort() + ).to.be.eql( + topics.map((t) => t.name).sort() + ) + + topics.forEach((t) => { + ipfs1.pubsub.unsubscribe(t.name, t.handler) + }) + + done() }) }) }) @@ -275,72 +346,91 @@ module.exports = (common) => { describe('multiple nodes', () => { it('receive messages from different node', (done) => { + const check = makeCheck(3, done) const expectedString = 'hello from the other side' - let subscription2 - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription2 = subscription - }) + const sub1 = (msg) => { + expect(msg.data.toString()).to.be.eql(expectedString) + // TODO: Reenable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + ipfs1.pubsub.unsubscribe(topic, sub1) + check() + } - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - expect(subscription).to.exist + const sub2 = (msg) => { + expect(msg.data.toString()).to.be.eql(expectedString) + // TODO: reenable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + ipfs2.pubsub.unsubscribe(topic, sub2) + check() + } - subscription.on('data', (msg) => { - expect(msg.data.toString()).to.equal(expectedString) - subscription.cancel(() => subscription2.cancel(done)) - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - }) - }) + ipfs2.pubsub.publish(topic, new Buffer(expectedString), check) }) }) it('receive multiple messages', (done) => { - let receivedMessages = [] - const expectedMessages = 2 - let subscription2 + const inbox1 = [] + const inbox2 = [] + const outbox = ['hello', 'world', 'this', 'is', 'pubsub'] - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription2 = subscription + const check = makeCheck(outbox.length * 3, (err) => { + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + + expect(inbox1.sort()).to.be.eql(outbox.sort()) + expect(inbox2.sort()).to.be.eql(outbox.sort()) + + done(err) }) - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + function sub1 (msg) { + inbox1.push(msg.data.toString()) + // TODO: enable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + check() + } - subscription.on('data', (msg) => { - receivedMessages.push(msg.data) + function sub2 (msg) { + inbox2.push(msg.data.toString()) + // TODO: enable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + check() + } - if (receivedMessages.length === expectedMessages) { - receivedMessages.forEach((msg) => { - expect(msg.toString()).to.be.equal('hi') - }) - subscription.cancel(() => subscription2.cancel(done)) - } - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topic, 'hi') - ipfs2.pubsub.publish(topic, 'hi') + outbox.forEach((msg) => { + ipfs2.pubsub.publish(topic, new Buffer(msg), check) }) }) }) }) describe('load tests', function () { - // Write the progress to stdout when in Node.js, silent when in the browser - const LOGS = false - const log = LOGS && process && process.stdout ? (s) => process.stdout.write(s) : () => {} + before(() => { + ipfs1.pubsub.setMaxListeners(10 * 1000) + ipfs2.pubsub.setMaxListeners(10 * 1000) + }) + + after(() => { + ipfs1.pubsub.setMaxListeners(11) + ipfs2.pubsub.setMaxListeners(11) + }) it('send/receive 10k messages', function (done) { - // js-ipfs is a little slow atm, so make sure we have enough time this.timeout(2 * 60 * 1000) const expectedString = 'hello' @@ -348,50 +438,39 @@ module.exports = (common) => { let sendCount = 0 let receivedCount = 0 let startTime - let subscription2 - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists - subscription2 = subscription - }) + const sub1 = (msg) => { + expect(msg.data.toString()).to.equal(expectedString) - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + receivedCount++ - const outputProgress = () => { - log(' \r') - log('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') - } + if (receivedCount >= count) { + const duration = new Date().getTime() - startTime + console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s\n`) - subscription.on('data', (d) => { - expect(d.data.toString()).to.equal(expectedString) + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + } + } - receivedCount++ - outputProgress() - if (receivedCount >= count) { - const duration = new Date().getTime() - startTime - log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s\n`) + const sub2 = (msg) => {} - subscription.cancel(() => subscription2.cancel(done)) - } - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist + startTime = new Date().getTime() - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - outputProgress() - ipfs2.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } - } - - waitForPeers(ipfs1, [ipfs2.peerId], (err) => { - expect(err).to.not.exist - startTime = new Date().getTime() - loop() - }) + ipfs2.pubsub.publish(topic, new Buffer(expectedString), cb) + }, + done + ) }) }) @@ -400,79 +479,42 @@ module.exports = (common) => { const count = 1000 let sendCount = 0 - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - log(' \r') - log('Sending messages: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } else { - done() - } - } - loop() + ipfs1.pubsub.publish(topic, new Buffer(expectedString), cb) + }, + done + ) }) - it('call subscribe 1k times', (done) => { + it('call subscribe/unsubscribe 1k times', (done) => { const count = 1000 let sendCount = 0 - let receivedCount = 0 - let subscription = null + const handlers = [] - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - log(' \r') - log('Subscribing: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.subscribe(topic, (err, res) => { - receivedCount++ - // First call should go through normally - if (receivedCount === 1) { - expect(err).to.not.exist - expect(res).to.exist - subscription = res - } else { - // Subsequent calls should return "error, duplicate subscription" - expect(err).to.exist - } - process.nextTick(() => loop()) + const handler = (msg) => {} + handlers.push(handler) + ipfs1.pubsub.subscribe(topic, handler, cb) + }, + (err) => { + expect(err).to.not.exist + handlers.forEach((handler) => { + ipfs1.pubsub.unsubscribe(topic, handler) }) - } else { - subscription.cancel(done) - } - } - loop() - }) - - it('subscribe/unsubscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - function outputProgress () { - log(' \r') - log('Subscribe/Unsubscribe: ' + sendCount + ' of ' + count + '\r') - } - - function loop () { - if (sendCount < count) { - sendCount++ - outputProgress() - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist - subscription.cancel((err) => { - expect(err).to.not.exist - outputProgress() - process.nextTick(() => loop()) - }) + expect(topics).to.be.eql([]) + done() }) - } else { - done() } - } - loop() + ) }) }) }) @@ -480,36 +522,21 @@ module.exports = (common) => { describe('promise API', () => { let ipfs1 - let ipfs2 before((done) => { - // CI takes longer to instantiate the daemon, - // so we need to increase the timeout for the - // before step common.setup((err, factory) => { - expect(err).to.not.exist - series([ - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs1 = node - ipfs1.id().then((res) => { - ipfs1.peerId = res.id - cb() - }) - }) - }, - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs2 = node - ipfs2.id().then((res) => { - ipfs2.peerId = res.id - cb() - }) - }) + if (err) { + return done(err) + } + + spawnWithId(factory, (err, node) => { + if (err) { + return done(err) } - ], done) + + ipfs1 = node + done() + }) }) }) @@ -517,45 +544,31 @@ module.exports = (common) => { common.teardown(done) }) - it('.subscribe', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - expect(subscription).to.exist - return subscription.cancel() - }) - }) - - it('.publish', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - return ipfs1.pubsub.publish(topic, 'hi').then(() => subscription) - }) - .then((subscription) => subscription.cancel()) - }) + it('.subscribe and .publish', () => { + const sub = (msg) => { + expect(msg.data.toString()).to.be.eql('hi') + ipfs1.pubsub.unsubscribe(topic, sub) + } - it('.cancel', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => subscription.cancel()) + return ipfs1.pubsub.subscribe(topic, sub) + .then(() => ipfs1.pubsub.publish(topic, new Buffer('hi'))) }) it('.peers', () => { - let s - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - s = subscription - return ipfs1.pubsub.peers(topic) - }) + const sub = (msg) => {} + + return ipfs1.pubsub.subscribe(topic, sub) + .then(() => ipfs1.pubsub.peers(topic)) .then((peers) => { expect(peers).to.exist - return s.cancel() + ipfs1.pubsub.unsubscribe(topic, sub) }) }) it('.ls', () => { return ipfs1.pubsub.ls() .then((topics) => { - expect(topics).to.exist - expect(topics.length).to.equal(0) + expect(topics).to.be.eql([]) }) }) })