diff --git a/src/core/components/floodsub.js b/src/core/components/floodsub.js index 8ec4132665..57fef1fb2c 100644 --- a/src/core/components/floodsub.js +++ b/src/core/components/floodsub.js @@ -7,17 +7,29 @@ const Readable = require('stream').Readable const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR const FSUB_ERROR = new Error(`FloodSub is not started.`) -module.exports = function floodsub (self) { - return { - start: promisify((callback) => { - if (!self.isOnline()) { - throw OFFLINE_ERROR - } +/* Internal subscriptions state and functions */ +let subscriptions = {} - self._floodsub = new FloodSub(self._libp2pNode) - return callback(null, self._floodsub) - }), +const addSubscription = (topic, request, stream) => { + subscriptions[topic] = { request: request, stream: stream } +} + +const removeSubscription = promisify((topic, callback) => { + if (!subscriptions[topic]) { + return callback(new Error(`Not subscribed to ${topic}`)) + } + // subscriptions[topic].request.abort() + // subscriptions[topic].stream.end() + delete subscriptions[topic] + + if (callback) { + callback(null) + } +}) + +module.exports = function floodsub (self) { + return { subscribe: promisify((topic, options, callback) => { // TODO: Clarify with @diasdavid what to do with the `options.discover` param // Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50 @@ -34,11 +46,12 @@ module.exports = function floodsub (self) { throw FSUB_ERROR } - let rs = new Readable() - rs.cancel = () => self._floodsub.unsubscribe(topic) - + let stream = new Readable({ objectMode: true }) + stream._read = () => {} + self._floodsub.on(topic, (data) => { - rs.emit('data', { + console.log("DATA", data.toString()) + stream.emit('data', { data: data.toString(), topicIDs: [topic] }) @@ -50,7 +63,14 @@ module.exports = function floodsub (self) { return callback(err) } - callback(null, rs) + stream.cancel = promisify((cb) => { + self._floodsub.unsubscribe(topic) + removeSubscription(topic, cb) + }) + + // Add the request to the active subscriptions and return the stream + addSubscription(topic, null, stream) + callback(null, stream) }), publish: promisify((topic, data, callback) => { diff --git a/src/core/components/go-online.js b/src/core/components/go-online.js index 793a0269a5..a65d93083d 100644 --- a/src/core/components/go-online.js +++ b/src/core/components/go-online.js @@ -2,6 +2,7 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const FloodSub = require('libp2p-floodsub') module.exports = function goOnline (self) { return (cb) => { @@ -21,6 +22,10 @@ module.exports = function goOnline (self) { ) self._bitswap.start() self._blockService.goOnline(self._bitswap) + + self._floodsub = new FloodSub(self._libp2pNode) + // self._floodsub.start() + cb() }) } diff --git a/test/core/both/test-pubsub.js b/test/core/both/test-pubsub.js index 3626189bc3..bdc83ad572 100644 --- a/test/core/both/test-pubsub.js +++ b/test/core/both/test-pubsub.js @@ -6,8 +6,6 @@ const IPFSFactory = require('../../utils/factory-core') let factory -console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - const common = { setup: function (cb) { factory = new IPFSFactory()