Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
chore: update pubsub interface
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Apr 7, 2021
1 parent edb8ca6 commit 5204d56
Show file tree
Hide file tree
Showing 13 changed files with 1,758 additions and 67 deletions.
13 changes: 10 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
"scripts": {
"prepare": "aegir build --no-bundle",
"lint": "aegir lint",
"build": "aegir build",
"build": "npm run build:proto && npm run build:proto-types && npm run build:types",
"build:types": "aegir build --no-bundle",
"build:proto": "npm run build:proto:rpc && npm run build:proto:topic-descriptor",
"build:proto:rpc": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/rpc.js ./src/pubsub/message/rpc.proto",
"build:proto:topic-descriptor": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/topic-descriptor.js ./src/pubsub/message/topic-descriptor.proto",
"build:proto-types": "npm run build:proto-types:rpc && npm run build:proto-types:topic-descriptor",
"build:proto-types:rpc": "pbts -o src/pubsub/message/rpc.d.ts src/pubsub/message/rpc.js",
"build:proto-types:topic-descriptor": "pbts -o src/pubsub/message/topic-descriptor.d.ts src/pubsub/message/topic-descriptor.js",
"test": "aegir test",
"test:node": "aegir test --target node",
"test:browser": "aegir test --target browser",
Expand All @@ -47,7 +54,7 @@
},
"homepage": "https://github.com/libp2p/js-interfaces#readme",
"dependencies": {
"@types/bl": "4.1.0",
"@types/bl": "^4.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"chai": "^4.3.4",
Expand All @@ -71,7 +78,7 @@
"p-limit": "^3.1.0",
"p-wait-for": "^3.2.0",
"peer-id": "^0.14.2",
"protons": "^2.0.0",
"protobufjs": "^6.10.2",
"sinon": "^10.0.0",
"streaming-iterables": "^5.0.4",
"uint8arrays": "^2.1.3"
Expand Down
59 changes: 34 additions & 25 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { pipe } = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')

const message = require('./message')
const { RPC } = require('./message/rpc')
const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils')
Expand All @@ -27,10 +27,10 @@ const {
* @typedef {import('bl')} BufferList
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('../connection/connection')} Connection
* @typedef {import('./message/types').RPC} RPC
* @typedef {import('./message/types').SubOpts} RPCSubOpts
* @typedef {import('./message/types').Message} RPCMessage
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
* @typedef {import('./message/rpc').IRPC} IRPC
* @typedef {import('./message/rpc').RPC.SubOpts} RPCSubOpts
* @typedef {import('./message/rpc').RPC.Message} RPCMessage
*/

/**
Expand Down Expand Up @@ -382,7 +382,7 @@ class PubsubBaseProtocol extends EventEmitter {

if (subs.length) {
// update peer subscriptions
subs.forEach((/** @type {RPCSubOpts} */ subOpt) => {
subs.forEach((subOpt) => {
this._processRpcSubOpt(idB58Str, subOpt)
})
this.emit('pubsub:subscription-change', peerStreams.id, subs)
Expand All @@ -396,7 +396,7 @@ class PubsubBaseProtocol extends EventEmitter {
if (msgs.length) {
// @ts-ignore RPC message is modified
msgs.forEach((message) => {
if (!(this.canRelayMessage || message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic)))) {
if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic))))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
Expand All @@ -411,11 +411,15 @@ class PubsubBaseProtocol extends EventEmitter {
* Handles a subscription change from a peer
*
* @param {string} id
* @param {RPCSubOpts} subOpt
* @param {RPC.ISubOpts} subOpt
*/
_processRpcSubOpt (id, subOpt) {
const t = subOpt.topicID

if (!t) {
return
}

let topicSet = this.topics.get(t)
if (!topicSet) {
topicSet = new Set()
Expand Down Expand Up @@ -462,7 +466,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {InMessage} message
*/
_emitMessage (message) {
message.topicIDs.forEach((topic) => {
message.topicIDs && message.topicIDs.forEach((topic) => {
if (this.subscriptions.has(topic)) {
this.emit(topic, message)
}
Expand All @@ -473,15 +477,17 @@ class PubsubBaseProtocol extends EventEmitter {
* The default msgID implementation
* Child class can override this.
*
* @param {RPCMessage} msg - the message object
* @param {InMessage} msg - the message object
* @returns {Uint8Array} message id as bytes
*/
getMsgId (msg) {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
// @ts-ignore seqno is optional in protobuf definition but it will exist
return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
// @ts-ignore data is optional but received messages will always have data
return utils.noSignMsgId(msg.data)
default:
throw errcode(new Error('Cannot get message id: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
Expand All @@ -508,25 +514,25 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {RPC}
*/
_decodeRpc (bytes) {
return message.rpc.RPC.decode(bytes)
return RPC.decode(bytes)
}

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*
* @param {RPC} rpc
* @param {IRPC} rpc
* @returns {Uint8Array}
*/
_encodeRpc (rpc) {
return message.rpc.RPC.encode(rpc)
return RPC.encode(rpc).finish()
}

/**
* Send an rpc object to a peer
*
* @param {string} id - peer id
* @param {RPC} rpc
* @param {IRPC} rpc
* @returns {void}
*/
_sendRpc (id, rpc) {
Expand Down Expand Up @@ -592,12 +598,15 @@ class PubsubBaseProtocol extends EventEmitter {
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
if (!message.topicIDs) {
return
}

for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
continue // eslint-disable-line
if (validatorFn) {
await validatorFn(topic, message)
}
await validatorFn(topic, message)
}
}

Expand All @@ -606,8 +615,8 @@ class PubsubBaseProtocol extends EventEmitter {
* Should be used by the routers to create the message to send.
*
* @protected
* @param {RPCMessage} message
* @returns {Promise<RPCMessage>}
* @param {InMessage} message
* @returns {Promise<InMessage>}
*/
_buildMessage (message) {
const signaturePolicy = this.globalSignaturePolicy
Expand All @@ -617,7 +626,7 @@ class PubsubBaseProtocol extends EventEmitter {
message.seqno = utils.randomSeqno()
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message
return Promise.resolve(message)
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
Expand Down Expand Up @@ -663,29 +672,30 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('publish', topic, message)

const from = this.peerId.toB58String()
let msgObject = {
const msgObject = {
receivedFrom: from,
data: message,
topicIDs: [topic]
}

// ensure that the message follows the signature policy
const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg)
// @ts-ignore different type as from is converted
const msg = utils.normalizeInRpcMessage(outMsg)

// Emit to self if I'm interested and emitSelf enabled
this.emitSelf && this._emitMessage(msgObject)
this.emitSelf && this._emitMessage(msg)

// send to all the other peers
await this._publish(msgObject)
await this._publish(msg)
}

/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
*
* @abstract
* @param {InMessage} message
* @param {InMessage|RPCMessage} message
* @returns {Promise<void>}
*
*/
Expand Down Expand Up @@ -744,7 +754,6 @@ class PubsubBaseProtocol extends EventEmitter {
}
}

PubsubBaseProtocol.message = message
PubsubBaseProtocol.utils = utils
PubsubBaseProtocol.SignaturePolicy = SignaturePolicy

Expand Down
16 changes: 0 additions & 16 deletions src/pubsub/message/index.js

This file was deleted.

Loading

0 comments on commit 5204d56

Please sign in to comment.