-
Notifications
You must be signed in to change notification settings - Fork 1.2k
WIP: feat/pubsub \o/ #610
WIP: feat/pubsub \o/ #610
Changes from 7 commits
d6c6d12
641fe1c
d3c4f02
9026a88
9b7da97
adeccec
94bbc43
4ee7b5b
5031470
8084cb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
'use strict' | ||
|
||
// NOTE: Floodsub CLI is not tested. Tests will not be run until | ||
// https://github.com/ipfs/js-ipfs-api/pull/377 | ||
// is merged | ||
module.exports = { | ||
command: 'floodsub', | ||
|
||
description: 'floodsub commands', | ||
|
||
builder (yargs) { | ||
return yargs | ||
.commandDir('floodsub') | ||
}, | ||
|
||
handler (argv) {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
'use strict' | ||
|
||
const utils = require('../../utils') | ||
const debug = require('debug') | ||
const log = debug('cli:floodsub') | ||
log.error = debug('cli:floodsub:error') | ||
|
||
module.exports = { | ||
command: 'publish <topic> <data>', | ||
|
||
describe: 'Publish data to a topic', | ||
|
||
builder: {}, | ||
|
||
handler (argv) { | ||
utils.getIPFS((err, ipfs) => { | ||
if (err) { | ||
throw err | ||
} | ||
|
||
ipfs.floodsub.publish(argv.topic, argv.data, (err) => { | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
'use strict' | ||
|
||
const utils = require('../../utils') | ||
const debug = require('debug') | ||
const log = debug('cli:floodsub') | ||
log.error = debug('cli:floodsub:error') | ||
|
||
module.exports = { | ||
command: 'start', | ||
|
||
describe: 'Start FloodSub', | ||
|
||
builder: {}, | ||
|
||
handler (argv) { | ||
utils.getIPFS((err, ipfs) => { | ||
if (err) { | ||
throw err | ||
} | ||
|
||
const fsub = ipfs.floodsub.start() | ||
if (fsub) { | ||
console.log(fsub.toString()) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
'use strict' | ||
|
||
const utils = require('../../utils') | ||
const debug = require('debug') | ||
const log = debug('cli:floodsub') | ||
log.error = debug('cli:floodsub:error') | ||
|
||
module.exports = { | ||
command: 'subscribe <topic>', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have an alias as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @diasdavid: Is the intention for this to be aliased from the top level as: |
||
|
||
describe: 'Subscribe to a topic', | ||
|
||
builder: {}, | ||
|
||
handler (argv) { | ||
utils.getIPFS((err, ipfs) => { | ||
if (err) { | ||
throw err | ||
} | ||
|
||
ipfs.floodsub.subscribe(argv.topic, (err, stream) => { | ||
if (err) { | ||
throw err | ||
} | ||
|
||
console.log(stream.toString()) | ||
}) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
'use strict' | ||
|
||
const utils = require('../../utils') | ||
const debug = require('debug') | ||
const log = debug('cli:floodsub') | ||
log.error = debug('cli:floodsub:error') | ||
|
||
module.exports = { | ||
command: 'unsubscribe <topic>', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have an alias as |
||
|
||
describe: 'Unsubscribe from a topic', | ||
|
||
builder: {}, | ||
|
||
handler (argv) { | ||
utils.getIPFS((err, ipfs) => { | ||
if (err) { | ||
throw err | ||
} | ||
|
||
ipfs.floodsub.unsubscribe(argv.topic, (err) => { | ||
if (err) { | ||
throw err | ||
} | ||
}) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
'use strict' | ||
|
||
const FloodSub = require('libp2p-floodsub') | ||
const promisify = require('promisify-es6') | ||
const Readable = require('stream').Readable | ||
|
||
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR | ||
const FSUB_ERROR = new Error(`FloodSub is not started.`) | ||
|
||
module.exports = function floodsub (self) { | ||
return { | ||
start: promisify((callback) => { | ||
if (!self.isOnline()) { | ||
throw OFFLINE_ERROR | ||
} | ||
|
||
self._floodsub = new FloodSub(self._libp2pNode) | ||
return callback(null, self._floodsub) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it doesn't have to return the floodsub instance, as it gets available in the self object There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call |
||
}), | ||
|
||
subscribe: promisify((topic, options, callback) => { | ||
// TODO: Clarify with @diasdavid what to do with the `options.discover` param | ||
// Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've answered somewhere, but just to keep track: Currently we do nothing as js-ipfs doesn't have the go-ipfs DHT |
||
if (typeof options === 'function') { | ||
callback = options | ||
options = {} | ||
} | ||
|
||
if (!self.isOnline()) { | ||
throw OFFLINE_ERROR | ||
} | ||
|
||
if (!self._floodsub) { | ||
throw FSUB_ERROR | ||
} | ||
|
||
let rs = new Readable() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you probably want to make sure this is a Object Stream Also, |
||
rs.cancel = () => self._floodsub.unsubscribe(topic) | ||
|
||
self._floodsub.on(topic, (data) => { | ||
rs.emit('data', { | ||
data: data.toString(), | ||
topicIDs: [topic] | ||
}) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might cause an error that if I call subscribe on the same topic twice, I'll start seeing the same message being emitted twice as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now handled |
||
|
||
try { | ||
self._floodsub.subscribe(topic) | ||
} catch (err) { | ||
return callback(err) | ||
} | ||
|
||
callback(null, rs) | ||
}), | ||
|
||
publish: promisify((topic, data, callback) => { | ||
if (!self.isOnline()) { | ||
throw OFFLINE_ERROR | ||
} | ||
|
||
if (!self._floodsub) { | ||
throw FSUB_ERROR | ||
} | ||
|
||
const buf = Buffer.isBuffer(data) ? data : new Buffer(data) | ||
|
||
try { | ||
self._floodsub.publish(topic, buf) | ||
} catch (err) { | ||
return callback(err) | ||
} | ||
|
||
callback(null) | ||
}), | ||
|
||
unsubscribe: promisify((topic, callback) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a topic is unsubscribed, all of the event listeners for those topics should be unregistered as well, otherwise this will cause an eventual memory leak |
||
if (!self.isOnline()) { | ||
throw OFFLINE_ERROR | ||
} | ||
|
||
if (!self._floodsub) { | ||
throw FSUB_ERROR | ||
} | ||
|
||
try { | ||
self._floodsub.unsubscribe(topic) | ||
} catch (err) { | ||
return callback(err) | ||
} | ||
|
||
callback(null) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
'use strict' | ||
|
||
const debug = require('debug') | ||
const log = debug('http-api:floodsub') | ||
log.error = debug('http-api:floodsub:error') | ||
|
||
exports = module.exports | ||
|
||
exports.start = { | ||
handler: (request, reply) => { | ||
request.server.app.ipfs.floodsub.start((err, floodsub) => { | ||
if (err) { | ||
log.error(err) | ||
return reply({ | ||
Message: `Failed to start: ${err}`, | ||
Code: 0 | ||
}).code(500) | ||
} | ||
|
||
return reply(floodsub) | ||
}) | ||
} | ||
} | ||
|
||
exports.subscribe = { | ||
handler: (request, reply) => { | ||
const discover = request.query.discover || null | ||
const topic = request.params.topic | ||
|
||
request.server.app.ipfs.floodsub.subscribe(topic, { discover }, (err, stream) => { | ||
if (err) { | ||
log.error(err) | ||
return reply({ | ||
Message: `Failed to subscribe to topic ${topic}: ${err}`, | ||
Code: 0 | ||
}).code(500) | ||
} | ||
|
||
// hapi is not very clever and throws if no | ||
// - _read method | ||
// - _readableState object | ||
// are there :( | ||
if (!stream._read) { | ||
stream._read = () => {} | ||
stream._readableState = {} | ||
} | ||
return reply(stream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a hunch that this won't work. This stream of objects needs to be converted to ndjson stream There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @diasdavid - I discovered the |
||
}) | ||
} | ||
} | ||
|
||
exports.publish = { | ||
handler: (request, reply) => { | ||
const buf = request.query.buf | ||
const topic = request.query.topic | ||
|
||
request.server.app.ipfs.floodsub.publish(topic, buf, (err) => { | ||
if (err) { | ||
log.error(err) | ||
return reply({ | ||
Message: `Failed to publish to topic ${topic}: ${err}`, | ||
Code: 0 | ||
}).code(500) | ||
} | ||
|
||
return reply() | ||
}) | ||
} | ||
} | ||
|
||
exports.unsubscribe = { | ||
handler: (request, reply) => { | ||
const topic = request.params.topic | ||
|
||
request.server.app.ipfs.floodsub.unsubscribe(topic, (err) => { | ||
if (err) { | ||
log.error(err) | ||
return reply({ | ||
Message: `Failed to unsubscribe from topic ${topic}: ${err}`, | ||
Code: 0 | ||
}).code(500) | ||
} | ||
|
||
return reply() | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prolly not have a separate command to start the pubsub but rather it should start "automatically" either on ipfs instance start (in goOnline() perhaps) or when subscribed to the first topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with not having a start command. In an initial version I had it initialize when a call to
pub
orsub
was made. My further comments on this are below :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, the floodsub routine should start when a daemon goes online.