Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

WIP: feat/pubsub \o/ #610

Merged
merged 10 commits into from
Dec 8, 2016
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"ipld-resolver": "^0.2.0",
"isstream": "^0.1.2",
"joi": "^9.2.0",
"libp2p-floodsub": "0.3.1",
"libp2p-ipfs": "^0.15.0",
"libp2p-ipfs-browser": "^0.16.0",
"lodash.flatmap": "^4.5.0",
Expand Down Expand Up @@ -147,4 +148,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
17 changes: 17 additions & 0 deletions src/cli/commands/floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

// NOTE: Floodsub CLI is not tested. Tests will not be run until
// https://github.com/ipfs/js-ipfs-api/pull/377
// is merged
module.exports = {
command: 'floodsub',

description: 'floodsub commands',

builder (yargs) {
return yargs
.commandDir('floodsub')
},

handler (argv) {}
}
28 changes: 28 additions & 0 deletions src/cli/commands/floodsub/publish.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'publish <topic> <data>',

describe: 'Publish data to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.floodsub.publish(argv.topic, argv.data, (err) => {
if (err) {
throw err
}
})
})
}
}
27 changes: 27 additions & 0 deletions src/cli/commands/floodsub/start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'start',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should prolly not have a separate command to start the pubsub but rather it should start "automatically" either on ipfs instance start (in goOnline() perhaps) or when subscribed to the first topic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with not having a start command. In an initial version I had it initialize when a call to pub or sub was made. My further comments on this are below :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the floodsub routine should start when a daemon goes online.


describe: 'Start FloodSub',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

const fsub = ipfs.floodsub.start()
if (fsub) {
console.log(fsub.toString())
}
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/floodsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'subscribe <topic>',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have an alias as sub?

Copy link
Author

@gavinmcdermott gavinmcdermott Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid: Is the intention for this to be aliased from the top level as: jsipfs sub <some topic> or from the module as: jsipfs floodsub sub <topic>?


describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.floodsub.subscribe(argv.topic, (err, stream) => {
if (err) {
throw err
}

console.log(stream.toString())
})
})
}
}
28 changes: 28 additions & 0 deletions src/cli/commands/floodsub/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:floodsub')
log.error = debug('cli:floodsub:error')

module.exports = {
command: 'unsubscribe <topic>',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have an alias as unsub?


describe: 'Unsubscribe from a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.floodsub.unsubscribe(argv.topic, (err) => {
if (err) {
throw err
}
})
})
}
}
94 changes: 94 additions & 0 deletions src/core/components/floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
'use strict'

const FloodSub = require('libp2p-floodsub')
const promisify = require('promisify-es6')
const Readable = require('stream').Readable

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
const FSUB_ERROR = new Error(`FloodSub is not started.`)

module.exports = function floodsub (self) {
return {
start: promisify((callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

self._floodsub = new FloodSub(self._libp2pNode)
return callback(null, self._floodsub)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it doesn't have to return the floodsub instance, as it gets available in the self object

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

}),

subscribe: promisify((topic, options, callback) => {
// TODO: Clarify with @diasdavid what to do with the `options.discover` param
// Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've answered somewhere, but just to keep track: Currently we do nothing as js-ipfs doesn't have the go-ipfs DHT

if (typeof options === 'function') {
callback = options
options = {}
}

if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (!self._floodsub) {
throw FSUB_ERROR
}

let rs = new Readable()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably want to make sure this is a Object Stream

Also, const can be used

rs.cancel = () => self._floodsub.unsubscribe(topic)

self._floodsub.on(topic, (data) => {
rs.emit('data', {
data: data.toString(),
topicIDs: [topic]
})
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause an error that if I call subscribe on the same topic twice, I'll start seeing the same message being emitted twice as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now handled


try {
self._floodsub.subscribe(topic)
} catch (err) {
return callback(err)
}

callback(null, rs)
}),

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (!self._floodsub) {
throw FSUB_ERROR
}

const buf = Buffer.isBuffer(data) ? data : new Buffer(data)

try {
self._floodsub.publish(topic, buf)
} catch (err) {
return callback(err)
}

callback(null)
}),

unsubscribe: promisify((topic, callback) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a topic is unsubscribed, all of the event listeners for those topics should be unregistered as well, otherwise this will cause an eventual memory leak

if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (!self._floodsub) {
throw FSUB_ERROR
}

try {
self._floodsub.unsubscribe(topic)
} catch (err) {
return callback(err)
}

callback(null)
})
}
}
3 changes: 3 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
const ping = require('./components/ping')
const files = require('./components/files')
const bitswap = require('./components/bitswap')
const floodsub = require('./components/floodsub')

exports = module.exports = IPFS

Expand All @@ -44,6 +45,7 @@ function IPFS (repoInstance) {
this._bitswap = null
this._blockService = new BlockService(this._repo)
this._ipldResolver = new IPLDResolver(this._blockService)
this._floodsub = null

// IPFS Core exposed components

Expand All @@ -67,4 +69,5 @@ function IPFS (repoInstance) {
this.files = files(this)
this.bitswap = bitswap(this)
this.ping = ping(this)
this.floodsub = floodsub(this)
}
87 changes: 87 additions & 0 deletions src/http-api/resources/floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict'

const debug = require('debug')
const log = debug('http-api:floodsub')
log.error = debug('http-api:floodsub:error')

exports = module.exports

exports.start = {
handler: (request, reply) => {
request.server.app.ipfs.floodsub.start((err, floodsub) => {
if (err) {
log.error(err)
return reply({
Message: `Failed to start: ${err}`,
Code: 0
}).code(500)
}

return reply(floodsub)
})
}
}

exports.subscribe = {
handler: (request, reply) => {
const discover = request.query.discover || null
const topic = request.params.topic

request.server.app.ipfs.floodsub.subscribe(topic, { discover }, (err, stream) => {
if (err) {
log.error(err)
return reply({
Message: `Failed to subscribe to topic ${topic}: ${err}`,
Code: 0
}).code(500)
}

// hapi is not very clever and throws if no
// - _read method
// - _readableState object
// are there :(
if (!stream._read) {
stream._read = () => {}
stream._readableState = {}
}
return reply(stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a hunch that this won't work. This stream of objects needs to be converted to ndjson stream

Copy link
Author

@gavinmcdermott gavinmcdermott Dec 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid - I discovered the pull-ndjson module you published; any examples of the ndjson module being used in the ecosystem?

})
}
}

exports.publish = {
handler: (request, reply) => {
const buf = request.query.buf
const topic = request.query.topic

request.server.app.ipfs.floodsub.publish(topic, buf, (err) => {
if (err) {
log.error(err)
return reply({
Message: `Failed to publish to topic ${topic}: ${err}`,
Code: 0
}).code(500)
}

return reply()
})
}
}

exports.unsubscribe = {
handler: (request, reply) => {
const topic = request.params.topic

request.server.app.ipfs.floodsub.unsubscribe(topic, (err) => {
if (err) {
log.error(err)
return reply({
Message: `Failed to unsubscribe from topic ${topic}: ${err}`,
Code: 0
}).code(500)
}

return reply()
})
}
}
1 change: 1 addition & 0 deletions src/http-api/resources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ exports.block = require('./block')
exports.swarm = require('./swarm')
exports.bitswap = require('./bitswap')
exports.files = require('./files')
exports.floodsub = require('./floodsub')
Loading