diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index e998517f01..8935fe3e4d 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -21,6 +21,7 @@ module.exports = function libp2p (self) { bootstrap: get(config, 'Bootstrap'), modules: self._libp2pModules, // EXPERIMENTAL + pubsub: get(self._options, 'EXPERIMENTAL.pubsub', false), dht: get(self._options, 'EXPERIMENTAL.dht', false), relay: { enabled: get(config, 'EXPERIMENTAL.relay.enabled', false), @@ -50,9 +51,7 @@ module.exports = function libp2p (self) { }) self._libp2pNode.start((err) => { - if (err) { - return callback(err) - } + if (err) { return callback(err) } self._libp2pNode.peerInfo.multiaddrs.forEach((ma) => { console.log('Swarm listening on', ma.toString()) diff --git a/src/core/components/no-floodsub.js b/src/core/components/no-floodsub.js deleted file mode 100644 index 95db571f5f..0000000000 --- a/src/core/components/no-floodsub.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict' - -const EventEmitter = require('events') - -function fail () { - throw new Error('The daemon must be run with \'--enable-pubsub-experiment\'') -} - -class NoFloodSub extends EventEmitter { - constructor () { - super() - - this.peers = new Map() - this.subscriptions = new Set() - } - - start (callback) { callback() } - stop (callback) { callback() } - publish () { fail() } - subscribe () { fail() } - unsubscribe () { fail() } -} - -module.exports = NoFloodSub diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index 7e51062f40..8d1204876e 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -1,35 +1,19 @@ 'use strict' const promisify = require('promisify-es6') -const setImmediate = require('async/setImmediate') - -const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR module.exports = function pubsub (self) { return { subscribe: (topic, options, handler, callback) => { - if (!self.isOnline()) { - throw new Error(OFFLINE_ERROR) - } - if (typeof options === 'function') { callback = handler handler = options options = {} } - function subscribe (cb) { - if (self._pubsub.listenerCount(topic) === 0) { - self._pubsub.subscribe(topic) - } - - self._pubsub.on(topic, handler) - setImmediate(cb) - } - if (!callback) { return new Promise((resolve, reject) => { - subscribe((err) => { + self.libp2p.pubsub.subscribe(topic, options, handler, (err) => { if (err) { return reject(err) } @@ -37,60 +21,28 @@ module.exports = function pubsub (self) { }) }) } else { - subscribe(callback) + self.libp2p.pubsub.subscribe(topic, options, handler, callback) } }, unsubscribe: (topic, handler) => { - self._pubsub.removeListener(topic, handler) - - if (self._pubsub.listenerCount(topic) === 0) { - self._pubsub.unsubscribe(topic) - } + self.libp2p.pubsub.unsubscribe(topic, handler) }, publish: promisify((topic, data, callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - if (!Buffer.isBuffer(data)) { - return setImmediate(() => callback(new Error('data must be a Buffer'))) - } - - self._pubsub.publish(topic, data) - setImmediate(() => callback()) + self.libp2p.pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - const subscriptions = Array.from(self._pubsub.subscriptions) - - setImmediate(() => callback(null, subscriptions)) + self.libp2p.pubsub.ls(callback) }), peers: promisify((topic, callback) => { - if (!self.isOnline()) { - return setImmediate(() => callback(new Error(OFFLINE_ERROR))) - } - - if (typeof topic === 'function') { - callback = topic - topic = null - } - - const peers = Array.from(self._pubsub.peers.values()) - .filter((peer) => topic ? peer.topics.has(topic) : true) - .map((peer) => peer.info.id.toB58String()) - - setImmediate(() => callback(null, peers)) + self.libp2p.pubsub.peers(topic, callback) }), setMaxListeners (n) { - return self._pubsub.setMaxListeners(n) + self.libp2p.pubsub.setMaxListeners(n) } } } diff --git a/src/core/components/start.js b/src/core/components/start.js index 3004cca34d..f0517b1ed9 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,8 +2,6 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') -const FloodSub = require('libp2p-floodsub') -const NoFloodSub = require('./no-floodsub') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') @@ -38,9 +36,7 @@ module.exports = (self) => { (cb) => self.preStart(cb), (cb) => self.libp2p.start(cb) ], (err) => { - if (err) { - return done(err) - } + if (err) { return done(err) } self._bitswap = new Bitswap( self._libp2pNode, @@ -50,11 +46,7 @@ module.exports = (self) => { self._bitswap.start() self._blockService.setExchange(self._bitswap) - - self._pubsub = self._options.EXPERIMENTAL.pubsub - ? new FloodSub(self._libp2pNode) - : new NoFloodSub() - self._pubsub.start(done) + done() }) }) } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index a39900d09c..4d35190d21 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -32,7 +32,6 @@ module.exports = (self) => { self._bitswap.stop() series([ - (cb) => self._pubsub.stop(cb), (cb) => self.libp2p.stop(cb), (cb) => self._repo.close(cb) ], done) diff --git a/test/core/bitswap.spec.js b/test/core/bitswap.spec.js index b0260ed5ff..a186aaf352 100644 --- a/test/core/bitswap.spec.js +++ b/test/core/bitswap.spec.js @@ -16,10 +16,9 @@ const isNode = require('detect-node') const multihashing = require('multihashing-async') const CID = require('cids') -const DaemonFactory = require('ipfsd-ctl') -const df = DaemonFactory.create({ type: 'js' }) - -const dfProc = DaemonFactory.create({ type: 'proc' }) +const IPFSFactory = require('ipfsd-ctl') +const fDaemon = IPFSFactory.create({ type: 'js' }) +const fInProc = IPFSFactory.create({ type: 'proc' }) // This gets replaced by '../utils/create-repo-browser.js' in the browser const createTempRepo = require('../utils/create-repo-nodejs.js') @@ -69,7 +68,7 @@ function connectNodes (remoteNode, inProcNode, callback) { let nodes = [] function addNode (inProcNode, callback) { - df.spawn({ + fDaemon.spawn({ exec: './src/cli/bin.js', config: { Addresses: { @@ -89,7 +88,7 @@ function addNode (inProcNode, callback) { }) } -describe('bitswap', function () { +describe.only('bitswap', function () { this.timeout(80 * 1000) let inProcNode // Node spawned inside this process @@ -119,7 +118,7 @@ describe('bitswap', function () { }) } - dfProc.spawn({ exec: IPFS, config }, (err, _ipfsd) => { + fInProc.spawn({ exec: IPFS, config: config }, (err, _ipfsd) => { expect(err).to.not.exist() nodes.push(_ipfsd) inProcNode = _ipfsd.api @@ -137,7 +136,7 @@ describe('bitswap', function () { }) }) - describe('transfer a block between', () => { + describe.only('transfer a block between', () => { it('2 peers', function (done) { this.timeout(80 * 1000)