From e9d8ab0dd6f2112a49e87f59a9a895fb4e591f00 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Mon, 20 Nov 2017 20:05:30 +1300 Subject: [PATCH 1/3] fix: allow topicCIDs from older peers --- src/pubsub.js | 1 + src/utils/pubsub-message-utils.js | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index ced2b2c32..ec61daf61 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -121,6 +121,7 @@ module.exports = (arg) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) if (subscriptions[topic]) { + // TODO: should a callback error be returned? return callback() } diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js index 2b3bbbabe..53d1e397a 100644 --- a/src/utils/pubsub-message-utils.js +++ b/src/utils/pubsub-message-utils.js @@ -30,10 +30,10 @@ function deserializeFromBase64 (obj) { from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(), seqno: Buffer.from(obj.seqno, 'base64'), data: Buffer.from(obj.data, 'base64'), - topicIDs: obj.topicIDs + topicIDs: obj.topicIDs || obj.topicCIDs } } function isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && obj.topicIDs + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) } From a7edf7676b27cd9f2189b3c3272124d52807baf3 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Wed, 22 Nov 2017 11:38:03 +1300 Subject: [PATCH 2/3] fix: do not eat pubsub message error --- src/pubsub.js | 2 +- src/utils/pubsub-message-stream.js | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index ec61daf61..aaa3f18dd 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -120,8 +120,8 @@ module.exports = (arg) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) + if (subscriptions[topic]) { - // TODO: should a callback error be returned? return callback() } diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js index f994cc401..ae335a12f 100644 --- a/src/utils/pubsub-message-stream.js +++ b/src/utils/pubsub-message-stream.js @@ -16,17 +16,19 @@ class PubsubMessageStream extends TransformStream { } _transform (obj, enc, callback) { - let msg + // go-ipfs returns '{}' as the very first object atm, we skip that + if (Object.keys(obj).length === 0) { + return callback() + } + try { - msg = PubsubMessage.deserialize(obj, 'base64') + const msg = PubsubMessage.deserialize(obj, 'base64') + this.push(msg) + callback() } catch (err) { - // Not a valid pubsub message - // go-ipfs returns '{}' as the very first object atm, we skip that - return callback() + return callback(err) } - this.push(msg) - callback() } } From b99958d3147c5fd1889b80d24a988d5d76c5a37b Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Wed, 22 Nov 2017 14:44:01 +1300 Subject: [PATCH 3/3] fix: lint errors --- src/pubsub.js | 2 +- src/utils/pubsub-message-stream.js | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/pubsub.js b/src/pubsub.js index aaa3f18dd..a762be4e1 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -120,7 +120,7 @@ module.exports = (arg) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) - + if (subscriptions[topic]) { return callback() } diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js index ae335a12f..992529213 100644 --- a/src/utils/pubsub-message-stream.js +++ b/src/utils/pubsub-message-stream.js @@ -20,7 +20,7 @@ class PubsubMessageStream extends TransformStream { if (Object.keys(obj).length === 0) { return callback() } - + try { const msg = PubsubMessage.deserialize(obj, 'base64') this.push(msg) @@ -28,7 +28,6 @@ class PubsubMessageStream extends TransformStream { } catch (err) { return callback(err) } - } }