Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Try get tests working
Browse files Browse the repository at this point in the history
  • Loading branch information
haadcode committed Dec 5, 2016
1 parent 699bd25 commit 6efd705
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 16 deletions.
48 changes: 34 additions & 14 deletions src/core/components/floodsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,29 @@ const Readable = require('stream').Readable
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
const FSUB_ERROR = new Error(`FloodSub is not started.`)

module.exports = function floodsub (self) {
return {
start: promisify((callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}
/* Internal subscriptions state and functions */
let subscriptions = {}

self._floodsub = new FloodSub(self._libp2pNode)
return callback(null, self._floodsub)
}),
const addSubscription = (topic, request, stream) => {
subscriptions[topic] = { request: request, stream: stream }
}

const removeSubscription = promisify((topic, callback) => {
if (!subscriptions[topic]) {
return callback(new Error(`Not subscribed to ${topic}`))
}

// subscriptions[topic].request.abort()
// subscriptions[topic].stream.end()
delete subscriptions[topic]

if (callback) {
callback(null)
}
})

module.exports = function floodsub (self) {
return {
subscribe: promisify((topic, options, callback) => {
// TODO: Clarify with @diasdavid what to do with the `options.discover` param
// Ref: https://github.com/ipfs/js-ipfs-api/pull/377/files#diff-f0c61c06fd5dc36b6f760b7ea97b1862R50
Expand All @@ -34,11 +46,12 @@ module.exports = function floodsub (self) {
throw FSUB_ERROR
}

let rs = new Readable()
rs.cancel = () => self._floodsub.unsubscribe(topic)

let stream = new Readable({ objectMode: true })
stream._read = () => {}
self._floodsub.on(topic, (data) => {
rs.emit('data', {
console.log("DATA", data.toString())
stream.emit('data', {
data: data.toString(),
topicIDs: [topic]
})
Expand All @@ -50,7 +63,14 @@ module.exports = function floodsub (self) {
return callback(err)
}

callback(null, rs)
stream.cancel = promisify((cb) => {
self._floodsub.unsubscribe(topic)
removeSubscription(topic, cb)
})

// Add the request to the active subscriptions and return the stream
addSubscription(topic, null, stream)
callback(null, stream)
}),

publish: promisify((topic, data, callback) => {
Expand Down
5 changes: 5 additions & 0 deletions src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')

module.exports = function goOnline (self) {
return (cb) => {
Expand All @@ -21,6 +22,10 @@ module.exports = function goOnline (self) {
)
self._bitswap.start()
self._blockService.goOnline(self._bitswap)

self._floodsub = new FloodSub(self._libp2pNode)
// self._floodsub.start()

cb()
})
}
Expand Down
2 changes: 0 additions & 2 deletions test/core/both/test-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ const IPFSFactory = require('../../utils/factory-core')

let factory

console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

const common = {
setup: function (cb) {
factory = new IPFSFactory()
Expand Down

0 comments on commit 6efd705

Please sign in to comment.