diff --git a/src/pubsub.js b/src/pubsub.js index ec61daf61..e5ed5d54b 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -120,6 +120,7 @@ module.exports = (arg) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) + if (subscriptions[topic]) { // TODO: should a callback error be returned? return callback() diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js index f994cc401..992529213 100644 --- a/src/utils/pubsub-message-stream.js +++ b/src/utils/pubsub-message-stream.js @@ -16,17 +16,18 @@ class PubsubMessageStream extends TransformStream { } _transform (obj, enc, callback) { - let msg - try { - msg = PubsubMessage.deserialize(obj, 'base64') - } catch (err) { - // Not a valid pubsub message - // go-ipfs returns '{}' as the very first object atm, we skip that + // go-ipfs returns '{}' as the very first object atm, we skip that + if (Object.keys(obj).length === 0) { return callback() } - this.push(msg) - callback() + try { + const msg = PubsubMessage.deserialize(obj, 'base64') + this.push(msg) + callback() + } catch (err) { + return callback(err) + } } }