diff --git a/package.json b/package.json index 801899e400..2e334f4782 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,7 @@ "form-data": "^2.1.2", "fs-pull-blob-store": "^0.4.1", "gulp": "^3.9.1", - "interface-ipfs-core": "^0.22.0", + "interface-ipfs-core": "git+https://github.com/ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f", "left-pad": "^1.1.3", "lodash": "^4.17.2", "ncp": "^2.0.0", @@ -80,7 +80,7 @@ "hapi": "^16.0.0", "hapi-set-header": "^1.0.2", "idb-pull-blob-store": "^0.5.1", - "ipfs-api": "^12.0.0", + "ipfs-api": "git+https://github.com/ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931", "ipfs-bitswap": "^0.8.1", "ipfs-block": "^0.5.0", "ipfs-block-service": "^0.7.0", @@ -91,8 +91,9 @@ "ipld-resolver": "^0.3.0", "isstream": "^0.1.2", "joi": "^10.0.1", - "libp2p-ipfs-nodejs": "^0.16.1", + "libp2p-floodsub": "0.3.1", "libp2p-ipfs-browser": "^0.17.0", + "libp2p-ipfs-nodejs": "^0.16.1", "lodash.flatmap": "^4.5.0", "lodash.get": "^4.4.2", "lodash.has": "^4.5.2", @@ -102,6 +103,7 @@ "mafmt": "^2.1.2", "multiaddr": "^2.1.1", "multihashes": "^0.3.0", + "ndjson": "1.5.0", "path-exists": "^3.0.0", "peer-book": "^0.3.0", "peer-id": "^0.8.0", @@ -149,4 +151,4 @@ "nginnever ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/cli/commands/pubsub.js b/src/cli/commands/pubsub.js new file mode 100644 index 0000000000..7102b9af67 --- /dev/null +++ b/src/cli/commands/pubsub.js @@ -0,0 +1,17 @@ +'use strict' + +// The command count bump from 56 to 60 depends on: +// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f +// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931 +module.exports = { + command: 'pubsub', + + description: 'pubsub commands', + + builder (yargs) { + return yargs + .commandDir('pubsub') + }, + + handler (argv) {} +} diff --git a/src/cli/commands/pubsub/ls.js b/src/cli/commands/pubsub/ls.js new file mode 100644 index 0000000000..0a53b01ce3 --- /dev/null +++ b/src/cli/commands/pubsub/ls.js @@ -0,0 +1,30 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'ls', + + describe: 'Get your list of subscriptions', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.ls((err, subscriptions) => { + if (err) { + throw err + } + + console.log(JSON.stringify(subscriptions, null, 2)) + }) + }) + } +} diff --git a/src/cli/commands/pubsub/peers.js b/src/cli/commands/pubsub/peers.js new file mode 100644 index 0000000000..0f4052b27e --- /dev/null +++ b/src/cli/commands/pubsub/peers.js @@ -0,0 +1,30 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'peers ', + + describe: 'Get all peers subscribed to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.peers(argv.topic, (err, peers) => { + if (err) { + throw err + } + + console.log(JSON.stringify(peers, null, 2)) + }) + }) + } +} diff --git a/src/cli/commands/pubsub/publish.js b/src/cli/commands/pubsub/publish.js new file mode 100644 index 0000000000..4f0e141270 --- /dev/null +++ b/src/cli/commands/pubsub/publish.js @@ -0,0 +1,28 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'publish ', + + describe: 'Publish data to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.publish(argv.topic, argv.data, (err) => { + if (err) { + throw err + } + }) + }) + } +} diff --git a/src/cli/commands/pubsub/subscribe.js b/src/cli/commands/pubsub/subscribe.js new file mode 100644 index 0000000000..1f3d108a85 --- /dev/null +++ b/src/cli/commands/pubsub/subscribe.js @@ -0,0 +1,32 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pubsub') +log.error = debug('cli:pubsub:error') + +module.exports = { + command: 'subscribe ', + + alias: 'sub', + + describe: 'Subscribe to a topic', + + builder: {}, + + handler (argv) { + utils.getIPFS((err, ipfs) => { + if (err) { + throw err + } + + ipfs.pubsub.subscribe(argv.topic, (err, stream) => { + if (err) { + throw err + } + + console.log(stream.toString()) + }) + }) + } +} diff --git a/src/core/components/go-online.js b/src/core/components/go-online.js index 793a0269a5..3e0aa18d4c 100644 --- a/src/core/components/go-online.js +++ b/src/core/components/go-online.js @@ -2,6 +2,7 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const FloodSub = require('libp2p-floodsub') module.exports = function goOnline (self) { return (cb) => { @@ -21,6 +22,11 @@ module.exports = function goOnline (self) { ) self._bitswap.start() self._blockService.goOnline(self._bitswap) + + // + self._pubsub = new FloodSub(self._libp2pNode) + // + cb() }) } diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js new file mode 100644 index 0000000000..3d33a788e5 --- /dev/null +++ b/src/core/components/pubsub.js @@ -0,0 +1,131 @@ +'use strict' + +const promisify = require('promisify-es6') +const Readable = require('stream').Readable +const _values = require('lodash.values') + +const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR + +let subscriptions = {} + +const addSubscription = (topic, request, stream) => { + subscriptions[topic] = { request: request, stream: stream } +} + +const removeSubscription = promisify((topic, callback) => { + if (!subscriptions[topic]) { + return callback(new Error(`Not subscribed to ${topic}`)) + } + + subscriptions[topic].stream.emit('end') + delete subscriptions[topic] + + if (callback) { + callback(null) + } +}) + +module.exports = function pubsub (self) { + return { + subscribe: promisify((topic, options, callback) => { + if (!self.isOnline()) { + throw OFFLINE_ERROR + } + + if (typeof options === 'function') { + callback = options + options = {} + } + + if (subscriptions[topic]) { + return callback(`Error: Already subscribed to '${topic}'`) + } + + const stream = new Readable({ objectMode: true }) + + stream._read = () => {} + + // There is no explicit unsubscribe; subscriptions have a cancel event + stream.cancel = promisify((cb) => { + self._pubsub.unsubscribe(topic) + removeSubscription(topic, cb) + }) + + self._pubsub.on(topic, (data) => { + stream.emit('data', { + data: data.toString(), + topicIDs: [topic] + }) + }) + + try { + self._pubsub.subscribe(topic) + } catch (err) { + return callback(err) + } + + // Add the request to the active subscriptions and return the stream + addSubscription(topic, null, stream) + callback(null, stream) + }), + + publish: promisify((topic, data, callback) => { + if (!self.isOnline()) { + throw OFFLINE_ERROR + } + + const buf = Buffer.isBuffer(data) ? data : new Buffer(data) + + try { + self._pubsub.publish(topic, buf) + } catch (err) { + return callback(err) + } + + callback(null) + }), + + ls: promisify((callback) => { + if (!self.isOnline()) { + throw OFFLINE_ERROR + } + + let subscriptions = [] + + try { + subscriptions = self._pubsub.getSubscriptions() + } catch (err) { + return callback(err) + } + + callback(null, subscriptions) + }), + + peers: promisify((topic, callback) => { + if (!self.isOnline()) { + throw OFFLINE_ERROR + } + + if (!subscriptions[topic]) { + return callback(`Error: Not subscribed to '${topic}'`) + } + + let peers = [] + + try { + const peerSet = self._pubsub.getPeerSet() + _values(peerSet).forEach((peer) => { + const idB58Str = peer.peerInfo.id.toB58String() + const index = peer.topics.indexOf(topic) + if (index > -1) { + peers.push(idB58Str) + } + }) + } catch (err) { + return callback(err) + } + + callback(null, peers) + }) + } +} diff --git a/src/core/index.js b/src/core/index.js index a58836c10c..fa4c2e8ae2 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -23,6 +23,7 @@ const swarm = require('./components/swarm') const ping = require('./components/ping') const files = require('./components/files') const bitswap = require('./components/bitswap') +const pubsub = require('./components/pubsub') exports = module.exports = IPFS @@ -44,6 +45,7 @@ function IPFS (repoInstance) { this._bitswap = null this._blockService = new BlockService(this._repo) this._ipldResolver = new IPLDResolver(this._blockService) + this._pubsub = null // IPFS Core exposed components @@ -67,4 +69,5 @@ function IPFS (repoInstance) { this.files = files(this) this.bitswap = bitswap(this) this.ping = ping(this) + this.pubsub = pubsub(this) } diff --git a/src/http-api/resources/index.js b/src/http-api/resources/index.js index b90a9d912c..671a349952 100644 --- a/src/http-api/resources/index.js +++ b/src/http-api/resources/index.js @@ -10,3 +10,4 @@ exports.block = require('./block') exports.swarm = require('./swarm') exports.bitswap = require('./bitswap') exports.files = require('./files') +exports.pubsub = require('./pubsub') diff --git a/src/http-api/resources/pubsub.js b/src/http-api/resources/pubsub.js new file mode 100644 index 0000000000..38c3084b3a --- /dev/null +++ b/src/http-api/resources/pubsub.js @@ -0,0 +1,90 @@ +'use strict' + +const debug = require('debug') +// const ndjson = require('ndjson') +const log = debug('http-api:pubsub') +log.error = debug('http-api:pubsub:error') + +exports = module.exports + +exports.subscribe = { + handler: (request, reply) => { + const discover = request.query.discover || null + const topic = request.params.topic + + request.server.app.ipfs.pubsub.subscribe(topic, { discover }, (err, stream) => { + if (err) { + log.error(err) + return reply({ + Message: `Failed to subscribe to topic ${topic}: ${err}`, + Code: 0 + }).code(500) + } + + // hapi is not very clever and throws if no: + // - _read method + // - _readableState object + // are there :( + if (!stream._read) { + stream._read = () => {} + stream._readableState = {} + } + + // ndjson.serialize + return reply(stream) + }) + } +} + +exports.publish = { + handler: (request, reply) => { + const buf = request.query.buf + const topic = request.query.topic + + request.server.app.ipfs.pubsub.publish(topic, buf, (err) => { + if (err) { + log.error(err) + return reply({ + Message: `Failed to publish to topic ${topic}: ${err}`, + Code: 0 + }).code(500) + } + + return reply() + }) + } +} + +exports.ls = { + handler: (request, reply) => { + request.server.app.ipfs.pubsub.ls((err, subscriptions) => { + if (err) { + log.error(err) + return reply({ + Message: `Failed to list subscriptions: ${err}`, + Code: 0 + }).code(500) + } + + return reply(subscriptions) + }) + } +} + +exports.peers = { + handler: (request, reply) => { + const topic = request.params.topic + + request.server.app.ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + log.error(err) + return reply({ + Message: `Failed to find peers subscribed to ${topic}: ${err}`, + Code: 0 + }).code(500) + } + + return reply(peers) + }) + } +} diff --git a/src/http-api/routes/index.js b/src/http-api/routes/index.js index 587f25de77..7b0afa885d 100644 --- a/src/http-api/routes/index.js +++ b/src/http-api/routes/index.js @@ -11,4 +11,5 @@ module.exports = (server) => { require('./swarm')(server) require('./bitswap')(server) require('./files')(server) + require('./pubsub')(server) } diff --git a/src/http-api/routes/pubsub.js b/src/http-api/routes/pubsub.js new file mode 100644 index 0000000000..321cb54476 --- /dev/null +++ b/src/http-api/routes/pubsub.js @@ -0,0 +1,59 @@ +'use strict' + +const Joi = require('joi') +const resources = require('./../resources') + +module.exports = (server) => { + const api = server.select('API') + + api.route({ + method: '*', + path: '/api/v0/pubsub/sub/{topic}', + config: { + handler: resources.pubsub.subscribe.handler, + validate: { + params: { + topic: Joi.string().required() + }, + query: { + discover: Joi.boolean() + } + } + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/pub', + config: { + handler: resources.pubsub.publish.handler, + validate: { + query: { + topic: Joi.string().required(), + buf: Joi.binary().required() + } + } + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/ls', + config: { + handler: resources.pubsub.ls.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pubsub/peers', + config: { + handler: resources.pubsub.peers.handler, + validate: { + params: { + topic: Joi.string().required() + } + } + } + }) +} diff --git a/test/cli/test-commands.js b/test/cli/test-commands.js index da484580d1..765a68e31a 100644 --- a/test/cli/test-commands.js +++ b/test/cli/test-commands.js @@ -7,11 +7,16 @@ const ipfsBase = require('../utils/ipfs-exec') const ipfs = ipfsBase(repoPath) const describeOnlineAndOffline = require('../utils/on-and-off') +// The command count bump from 56 to 60 depends on: +// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f +// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931 +const commandCount = 60 + describe('commands', () => { describeOnlineAndOffline(repoPath, () => { it('list the commands', () => { return ipfs('commands').then((out) => { - expect(out.split('\n')).to.have.length(56) + expect(out.split('\n')).to.have.length(commandCount) }) }) }) @@ -20,7 +25,7 @@ describe('commands', () => { return ipfsBase(repoPath, { cwd: '/tmp' })('commands').then((out) => { - expect(out.split('\n').length).to.equal(56) + expect(out.split('\n').length).to.equal(commandCount) }) }) }) diff --git a/test/cli/test-pubsub.js b/test/cli/test-pubsub.js new file mode 100644 index 0000000000..20b5ffef3e --- /dev/null +++ b/test/cli/test-pubsub.js @@ -0,0 +1,86 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const HttpAPI = require('../../src/http-api') +const createTempNode = require('../utils/temp-node') +const repoPath = require('./index').repoPath +const ipfs = require('../utils/ipfs-exec')(repoPath) + +// This depends on: +// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f +// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931 +describe.only('pubsub', function () { + this.timeout(30 * 1000) + let node + + const topicA = 'nonscentsA' + const topicB = 'nonscentsB' + const message = new Buffer('Some non cents.') + + before((done) => { + createTempNode(1, (err, _node) => { + expect(err).to.not.exist + node = _node + node.goOnline((err) => { + expect(err).to.not.exist + done() + }) + }) + }) + + after((done) => { + node.goOffline(done) + }) + + describe('api running', () => { + let httpAPI + const called = true + + before((done) => { + httpAPI = new HttpAPI(repoPath) + httpAPI.start((err) => { + expect(err).to.not.exist + done() + }) + }) + + after((done) => { + httpAPI.stop((err) => { + expect(err).to.not.exist + done() + }) + }) + + it('subscribe', () => { + return ipfs('pubsub', 'subscribe', topicA).then((out) => { + expect(out).to.have.length.above(0) + }) + }) + + it('subscribe alias', () => { + return ipfs('pubsub', 'sub', topicB).then((out) => { + expect(out).to.have.length.above(0) + }) + }) + + it('publish', () => { + return ipfs('pubsub', 'publish', topicA, message).then((out) => { + expect(called).to.eql(true) + }) + }) + + it('ls', () => { + return ipfs('pubsub', 'ls').then((out) => { + expect(out).to.have.length.above(0) + }) + }) + + it('peers', () => { + return ipfs('pubsub', 'peers', topicA).then((out) => { + expect(out).to.be.eql('[]') + }) + }) + }) +}) diff --git a/test/core/both/index.js b/test/core/both/index.js index a6dc8e5621..5b27dd5e69 100644 --- a/test/core/both/index.js +++ b/test/core/both/index.js @@ -10,4 +10,5 @@ describe('--both', () => { require('./test-generic') require('./test-init') require('./test-object') + require('./test-pubsub') }) diff --git a/test/core/both/test-pubsub.js b/test/core/both/test-pubsub.js new file mode 100644 index 0000000000..bdc83ad572 --- /dev/null +++ b/test/core/both/test-pubsub.js @@ -0,0 +1,19 @@ +/* eslint-env mocha */ +'use strict' + +const test = require('interface-ipfs-core') +const IPFSFactory = require('../../utils/factory-core') + +let factory + +const common = { + setup: function (cb) { + factory = new IPFSFactory() + cb(null, factory) + }, + teardown: function (cb) { + factory.dismantle(cb) + } +} + +test.pubsub(common) diff --git a/test/http-api/inject/test-pubsub.js b/test/http-api/inject/test-pubsub.js new file mode 100644 index 0000000000..38299c72f9 --- /dev/null +++ b/test/http-api/inject/test-pubsub.js @@ -0,0 +1,126 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const createTempNode = require('./../../utils/temp-node') + +module.exports = (http) => { + describe.only('/pubsub', () => { + let api + let tmpNode + + const buf = new Buffer('some message') + const topic = 'nonScents' + + before((done) => { + api = http.api.server.select('API') + + createTempNode(47, (err, _ipfs) => { + expect(err).to.not.exist + tmpNode = _ipfs + tmpNode.goOnline((err) => { + expect(err).to.not.exist + done() + }) + }) + }) + + after((done) => { + setTimeout(() => { + tmpNode.goOffline(done) + }, 1000) + }) + + describe('/sub/{topic}', () => { + it('returns 404 if no topic is provided', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/sub` + }, (res) => { + expect(res.statusCode).to.equal(404) + done() + }) + }) + + it('returns 200 with topic', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/sub/${topic}` + }, (res) => { + expect(res.statusCode).to.equal(200) + done() + }) + }) + }) + + xdescribe('/pub', () => { + it('returns 400 if no buffer is provided', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pubsub/pub?topic=${topic}` + }, (res) => { + expect(res.statusCode).to.equal(400) + expect(res.statusMessage).to.equal('Bad Request') + done() + }) + }) + + it('returns 400 if no topic is provided', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pubsub/pub?buf=${buf}` + }, (res) => { + expect(res.statusCode).to.equal(400) + expect(res.statusMessage).to.equal('Bad Request') + done() + }) + }) + + it('returns 200 with topic and buffer', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pubsub/pub?buf=${buf}&topic=${topic}` + }, (res) => { + expect(res.statusCode).to.equal(200) + done() + }) + }) + }) + + xdescribe('/ls', () => { + it('returns 200', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/ls` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result).to.be.an('object') + done() + }) + }) + }) + + xdescribe('/peers/{topic}', () => { + it('returns 404 if no topic is provided', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/peers` + }, (res) => { + expect(res.statusCode).to.equal(404) + done() + }) + }) + + it('returns 200 with topic', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pubsub/peers/${topic}` + }, (res) => { + expect(res.statusCode).to.equal(200) + done() + }) + }) + }) + }) +} diff --git a/test/utils/ipfs-exec.js b/test/utils/ipfs-exec.js index 0525d0c7fa..371435850a 100644 --- a/test/utils/ipfs-exec.js +++ b/test/utils/ipfs-exec.js @@ -20,7 +20,7 @@ module.exports = (repoPath, opts) => { env.IPFS_PATH = repoPath const config = Object.assign({}, { - stipEof: true, + stripEof: true, env: env, timeout: 60 * 1000 }, opts)