Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
chore: update pubsub interface, multiaddr and remove protons (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Apr 12, 2021
1 parent edb8ca6 commit 8a6d3d0
Show file tree
Hide file tree
Showing 23 changed files with 1,793 additions and 103 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: yarn lint
- run: npm install
- run: npx aegir lint
- uses: gozala/typescript-error-reporter-action@v1.0.4
- run: yarn build
- run: yarn aegir dep-check
- run: npx aegir build
- run: npx aegir dep-check
- uses: ipfs/aegir/actions/bundle-size@master
name: size
with:
Expand All @@ -34,20 +34,20 @@ jobs:
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
- run: yarn
- run: npm install
- run: npx nyc --reporter=lcov aegir test -t node -- --bail
- uses: codecov/codecov-action@v1
test-chrome:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: npm install
- run: npx aegir test -t browser -t webworker --bail
test-firefox:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: npm install
- run: npx aegir test -t browser -t webworker --bail -- --browsers FirefoxHeadless
25 changes: 16 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
"scripts": {
"prepare": "aegir build --no-bundle",
"lint": "aegir lint",
"build": "aegir build",
"build": "npm run build:proto && npm run build:proto-types && npm run build:types",
"build:types": "aegir build --no-bundle",
"build:proto": "npm run build:proto:rpc && npm run build:proto:topic-descriptor",
"build:proto:rpc": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/rpc.js ./src/pubsub/message/rpc.proto",
"build:proto:topic-descriptor": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/topic-descriptor.js ./src/pubsub/message/topic-descriptor.proto",
"build:proto-types": "npm run build:proto-types:rpc && npm run build:proto-types:topic-descriptor",
"build:proto-types:rpc": "pbts -o src/pubsub/message/rpc.d.ts src/pubsub/message/rpc.js",
"build:proto-types:topic-descriptor": "pbts -o src/pubsub/message/topic-descriptor.d.ts src/pubsub/message/topic-descriptor.js",
"test": "aegir test",
"test:node": "aegir test --target node",
"test:browser": "aegir test --target browser",
Expand All @@ -47,7 +54,7 @@
},
"homepage": "https://github.com/libp2p/js-interfaces#readme",
"dependencies": {
"@types/bl": "4.1.0",
"@types/bl": "^4.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"chai": "^4.3.4",
Expand All @@ -57,31 +64,31 @@
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"err-code": "^3.0.1",
"it-goodbye": "^2.0.2",
"it-length-prefixed": "^3.1.0",
"it-goodbye": "^3.0.0",
"it-length-prefixed": "^5.0.2",
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.2",
"libp2p-crypto": "^0.19.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.1.2",
"libp2p-tcp": "^0.15.3",
"multiaddr": "^9.0.1",
"multibase": "^4.0.2",
"multihashes": "^4.0.2",
"p-defer": "^3.0.0",
"p-limit": "^3.1.0",
"p-wait-for": "^3.2.0",
"peer-id": "^0.14.2",
"protons": "^2.0.0",
"protobufjs": "^6.10.2",
"sinon": "^10.0.0",
"streaming-iterables": "^5.0.4",
"uint8arrays": "^2.1.3"
},
"devDependencies": {
"@types/debug": "^4.1.5",
"aegir": "^32.1.0",
"aegir": "^33.0.0",
"cids": "^1.1.6",
"events": "^3.3.0",
"it-handshake": "^1.0.2",
"it-handshake": "^2.0.0",
"rimraf": "^3.0.2",
"util": "^0.12.3"
},
Expand Down
10 changes: 5 additions & 5 deletions src/connection/connection.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const { Multiaddr } = require('multiaddr')
const errCode = require('err-code')
const { OPEN, CLOSING, CLOSED } = require('./status')

Expand All @@ -25,8 +25,8 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')
* @property {string} [encryption] - connection encryption method identifier.
*
* @typedef {Object} ConnectionOptions
* @property {multiaddr} [localAddr] - local multiaddr of the connection if known.
* @property {multiaddr} remoteAddr - remote multiaddr of the connection.
* @property {Multiaddr} [localAddr] - local multiaddr of the connection if known.
* @property {Multiaddr} remoteAddr - remote multiaddr of the connection.
* @property {PeerId} localPeer - local peer-id.
* @property {PeerId} remotePeer - remote peer-id.
* @property {(protocols: string|string[]) => Promise<{stream: MuxedStream, protocol: string}>} newStream - new stream muxer function.
Expand Down Expand Up @@ -231,7 +231,7 @@ class Connection {
module.exports = Connection

/**
* @param {multiaddr|undefined} localAddr
* @param {Multiaddr|undefined} localAddr
* @param {PeerId} localPeer
* @param {PeerId} remotePeer
* @param {(protocols: string | string[]) => Promise<{ stream: import("../stream-muxer/types").MuxedStream; protocol: string; }>} newStream
Expand All @@ -240,7 +240,7 @@ module.exports = Connection
* @param {{ direction: any; timeline: any; multiplexer?: string | undefined; encryption?: string | undefined; }} stat
*/
function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
if (localAddr && !multiaddr.isMultiaddr(localAddr)) {
if (localAddr && !Multiaddr.isMultiaddr(localAddr)) {
throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS')
}

Expand Down
4 changes: 2 additions & 2 deletions src/peer-discovery/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const chai = require('chai')
const expect = chai.expect
chai.use(require('dirty-chai'))

const multiaddr = require('multiaddr')
const { Multiaddr } = require('multiaddr')
const PeerId = require('peer-id')

const delay = require('delay')
Expand Down Expand Up @@ -55,7 +55,7 @@ module.exports = (common) => {
expect(PeerId.isPeerId(id)).to.eql(true)
expect(multiaddrs).to.exist()

multiaddrs.forEach((m) => expect(multiaddr.isMultiaddr(m)).to.eql(true))
multiaddrs.forEach((m) => expect(Multiaddr.isMultiaddr(m)).to.eql(true))

defer.resolve()
})
Expand Down
53 changes: 29 additions & 24 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { pipe } = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')

const message = require('./message')
const { RPC } = require('./message/rpc')
const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils')
Expand All @@ -27,10 +27,10 @@ const {
* @typedef {import('bl')} BufferList
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('../connection/connection')} Connection
* @typedef {import('./message/types').RPC} RPC
* @typedef {import('./message/types').SubOpts} RPCSubOpts
* @typedef {import('./message/types').Message} RPCMessage
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
* @typedef {import('./message/rpc').IRPC} IRPC
* @typedef {import('./message/rpc').RPC.SubOpts} RPCSubOpts
* @typedef {import('./message/rpc').RPC.Message} RPCMessage
*/

/**
Expand Down Expand Up @@ -382,7 +382,7 @@ class PubsubBaseProtocol extends EventEmitter {

if (subs.length) {
// update peer subscriptions
subs.forEach((/** @type {RPCSubOpts} */ subOpt) => {
subs.forEach((subOpt) => {
this._processRpcSubOpt(idB58Str, subOpt)
})
this.emit('pubsub:subscription-change', peerStreams.id, subs)
Expand All @@ -396,7 +396,7 @@ class PubsubBaseProtocol extends EventEmitter {
if (msgs.length) {
// @ts-ignore RPC message is modified
msgs.forEach((message) => {
if (!(this.canRelayMessage || message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic)))) {
if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
Expand All @@ -411,11 +411,15 @@ class PubsubBaseProtocol extends EventEmitter {
* Handles a subscription change from a peer
*
* @param {string} id
* @param {RPCSubOpts} subOpt
* @param {RPC.ISubOpts} subOpt
*/
_processRpcSubOpt (id, subOpt) {
const t = subOpt.topicID

if (!t) {
return
}

let topicSet = this.topics.get(t)
if (!topicSet) {
topicSet = new Set()
Expand Down Expand Up @@ -473,13 +477,14 @@ class PubsubBaseProtocol extends EventEmitter {
* The default msgID implementation
* Child class can override this.
*
* @param {RPCMessage} msg - the message object
* @param {InMessage} msg - the message object
* @returns {Uint8Array} message id as bytes
*/
getMsgId (msg) {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
// @ts-ignore seqno is optional in protobuf definition but it will exist
return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
return utils.noSignMsgId(msg.data)
Expand Down Expand Up @@ -508,25 +513,25 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {RPC}
*/
_decodeRpc (bytes) {
return message.rpc.RPC.decode(bytes)
return RPC.decode(bytes)
}

/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
*
* @param {RPC} rpc
* @param {IRPC} rpc
* @returns {Uint8Array}
*/
_encodeRpc (rpc) {
return message.rpc.RPC.encode(rpc)
return RPC.encode(rpc).finish()
}

/**
* Send an rpc object to a peer
*
* @param {string} id - peer id
* @param {RPC} rpc
* @param {IRPC} rpc
* @returns {void}
*/
_sendRpc (id, rpc) {
Expand Down Expand Up @@ -592,12 +597,12 @@ class PubsubBaseProtocol extends EventEmitter {
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}

for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
continue // eslint-disable-line
if (validatorFn) {
await validatorFn(topic, message)
}
await validatorFn(topic, message)
}
}

Expand All @@ -606,8 +611,8 @@ class PubsubBaseProtocol extends EventEmitter {
* Should be used by the routers to create the message to send.
*
* @protected
* @param {RPCMessage} message
* @returns {Promise<RPCMessage>}
* @param {InMessage} message
* @returns {Promise<InMessage>}
*/
_buildMessage (message) {
const signaturePolicy = this.globalSignaturePolicy
Expand All @@ -617,7 +622,7 @@ class PubsubBaseProtocol extends EventEmitter {
message.seqno = utils.randomSeqno()
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message
return Promise.resolve(message)
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
Expand Down Expand Up @@ -663,29 +668,30 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('publish', topic, message)

const from = this.peerId.toB58String()
let msgObject = {
const msgObject = {
receivedFrom: from,
data: message,
topicIDs: [topic]
}

// ensure that the message follows the signature policy
const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg)
// @ts-ignore different type as from is converted
const msg = utils.normalizeInRpcMessage(outMsg)

// Emit to self if I'm interested and emitSelf enabled
this.emitSelf && this._emitMessage(msgObject)
this.emitSelf && this._emitMessage(msg)

// send to all the other peers
await this._publish(msgObject)
await this._publish(msg)
}

/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
*
* @abstract
* @param {InMessage} message
* @param {InMessage|RPCMessage} message
* @returns {Promise<void>}
*
*/
Expand Down Expand Up @@ -744,7 +750,6 @@ class PubsubBaseProtocol extends EventEmitter {
}
}

PubsubBaseProtocol.message = message
PubsubBaseProtocol.utils = utils
PubsubBaseProtocol.SignaturePolicy = SignaturePolicy

Expand Down
16 changes: 0 additions & 16 deletions src/pubsub/message/index.js

This file was deleted.

Loading

0 comments on commit 8a6d3d0

Please sign in to comment.