From 8a6d3d0661ad8a5cc2e0de690de7081d9131c152 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 12 Apr 2021 12:10:56 +0200 Subject: [PATCH] chore: update pubsub interface, multiaddr and remove protons (#89) --- .github/workflows/main.yml | 14 +- package.json | 25 +- src/connection/connection.js | 10 +- src/peer-discovery/tests/index.js | 4 +- src/pubsub/index.js | 53 +- src/pubsub/message/index.js | 16 - src/pubsub/message/rpc.d.ts | 222 ++++++ src/pubsub/message/rpc.js | 645 ++++++++++++++++++ .../message/{rpc.proto.js => rpc.proto} | 6 +- src/pubsub/message/sign.js | 19 +- src/pubsub/message/topic-descriptor.d.ts | 224 ++++++ src/pubsub/message/topic-descriptor.js | 586 ++++++++++++++++ ...riptor.proto.js => topic-descriptor.proto} | 8 +- src/pubsub/message/types.d.ts | 5 - src/pubsub/peer-streams.js | 1 - src/pubsub/tests/two-nodes.js | 8 +- src/pubsub/utils.js | 18 +- src/stream-muxer/tests/close-test.js | 4 +- test/connection/compliance.spec.js | 6 +- test/peer-discovery/mock-discovery.js | 4 +- test/pubsub/sign.spec.js | 8 +- test/pubsub/utils/index.js | 6 +- tsconfig.json | 4 + 23 files changed, 1793 insertions(+), 103 deletions(-) delete mode 100644 src/pubsub/message/index.js create mode 100644 src/pubsub/message/rpc.d.ts create mode 100644 src/pubsub/message/rpc.js rename src/pubsub/message/{rpc.proto.js => rpc.proto} (92%) create mode 100644 src/pubsub/message/topic-descriptor.d.ts create mode 100644 src/pubsub/message/topic-descriptor.js rename src/pubsub/message/{topic-descriptor.proto.js => topic-descriptor.proto} (93%) delete mode 100644 src/pubsub/message/types.d.ts diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c5002ceae..ae36af676 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -12,11 +12,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - run: yarn - - run: yarn lint + - run: npm install + - run: npx aegir lint - uses: gozala/typescript-error-reporter-action@v1.0.4 - - run: yarn build - - run: yarn aegir dep-check + - run: npx aegir build + - run: npx aegir dep-check - uses: ipfs/aegir/actions/bundle-size@master name: size with: @@ -34,7 +34,7 @@ jobs: - uses: actions/setup-node@v1 with: node-version: ${{ matrix.node }} - - run: yarn + - run: npm install - run: npx nyc --reporter=lcov aegir test -t node -- --bail - uses: codecov/codecov-action@v1 test-chrome: @@ -42,12 +42,12 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - run: yarn + - run: npm install - run: npx aegir test -t browser -t webworker --bail test-firefox: needs: check runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - run: yarn + - run: npm install - run: npx aegir test -t browser -t webworker --bail -- --browsers FirefoxHeadless diff --git a/package.json b/package.json index aeda256b3..75a8ca0f5 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,14 @@ "scripts": { "prepare": "aegir build --no-bundle", "lint": "aegir lint", - "build": "aegir build", + "build": "npm run build:proto && npm run build:proto-types && npm run build:types", + "build:types": "aegir build --no-bundle", + "build:proto": "npm run build:proto:rpc && npm run build:proto:topic-descriptor", + "build:proto:rpc": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/rpc.js ./src/pubsub/message/rpc.proto", + "build:proto:topic-descriptor": "pbjs -t static-module -w commonjs --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/pubsub/message/topic-descriptor.js ./src/pubsub/message/topic-descriptor.proto", + "build:proto-types": "npm run build:proto-types:rpc && npm run build:proto-types:topic-descriptor", + "build:proto-types:rpc": "pbts -o src/pubsub/message/rpc.d.ts src/pubsub/message/rpc.js", + "build:proto-types:topic-descriptor": "pbts -o src/pubsub/message/topic-descriptor.d.ts src/pubsub/message/topic-descriptor.js", "test": "aegir test", "test:node": "aegir test --target node", "test:browser": "aegir test --target browser", @@ -47,7 +54,7 @@ }, "homepage": "https://github.com/libp2p/js-interfaces#readme", "dependencies": { - "@types/bl": "4.1.0", + "@types/bl": "^4.1.0", "abort-controller": "^3.0.0", "abortable-iterator": "^3.0.0", "chai": "^4.3.4", @@ -57,31 +64,31 @@ "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", "err-code": "^3.0.1", - "it-goodbye": "^2.0.2", - "it-length-prefixed": "^3.1.0", + "it-goodbye": "^3.0.0", + "it-length-prefixed": "^5.0.2", "it-pair": "^1.0.0", "it-pipe": "^1.1.0", "it-pushable": "^1.4.2", "libp2p-crypto": "^0.19.0", - "libp2p-tcp": "^0.15.0", - "multiaddr": "^8.1.2", + "libp2p-tcp": "^0.15.3", + "multiaddr": "^9.0.1", "multibase": "^4.0.2", "multihashes": "^4.0.2", "p-defer": "^3.0.0", "p-limit": "^3.1.0", "p-wait-for": "^3.2.0", "peer-id": "^0.14.2", - "protons": "^2.0.0", + "protobufjs": "^6.10.2", "sinon": "^10.0.0", "streaming-iterables": "^5.0.4", "uint8arrays": "^2.1.3" }, "devDependencies": { "@types/debug": "^4.1.5", - "aegir": "^32.1.0", + "aegir": "^33.0.0", "cids": "^1.1.6", "events": "^3.3.0", - "it-handshake": "^1.0.2", + "it-handshake": "^2.0.0", "rimraf": "^3.0.2", "util": "^0.12.3" }, diff --git a/src/connection/connection.js b/src/connection/connection.js index afe06c881..168ff114b 100644 --- a/src/connection/connection.js +++ b/src/connection/connection.js @@ -1,7 +1,7 @@ 'use strict' const PeerId = require('peer-id') -const multiaddr = require('multiaddr') +const { Multiaddr } = require('multiaddr') const errCode = require('err-code') const { OPEN, CLOSING, CLOSED } = require('./status') @@ -25,8 +25,8 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection') * @property {string} [encryption] - connection encryption method identifier. * * @typedef {Object} ConnectionOptions - * @property {multiaddr} [localAddr] - local multiaddr of the connection if known. - * @property {multiaddr} remoteAddr - remote multiaddr of the connection. + * @property {Multiaddr} [localAddr] - local multiaddr of the connection if known. + * @property {Multiaddr} remoteAddr - remote multiaddr of the connection. * @property {PeerId} localPeer - local peer-id. * @property {PeerId} remotePeer - remote peer-id. * @property {(protocols: string|string[]) => Promise<{stream: MuxedStream, protocol: string}>} newStream - new stream muxer function. @@ -231,7 +231,7 @@ class Connection { module.exports = Connection /** - * @param {multiaddr|undefined} localAddr + * @param {Multiaddr|undefined} localAddr * @param {PeerId} localPeer * @param {PeerId} remotePeer * @param {(protocols: string | string[]) => Promise<{ stream: import("../stream-muxer/types").MuxedStream; protocol: string; }>} newStream @@ -240,7 +240,7 @@ module.exports = Connection * @param {{ direction: any; timeline: any; multiplexer?: string | undefined; encryption?: string | undefined; }} stat */ function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { - if (localAddr && !multiaddr.isMultiaddr(localAddr)) { + if (localAddr && !Multiaddr.isMultiaddr(localAddr)) { throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS') } diff --git a/src/peer-discovery/tests/index.js b/src/peer-discovery/tests/index.js index 728389907..0dffc5f6e 100644 --- a/src/peer-discovery/tests/index.js +++ b/src/peer-discovery/tests/index.js @@ -6,7 +6,7 @@ const chai = require('chai') const expect = chai.expect chai.use(require('dirty-chai')) -const multiaddr = require('multiaddr') +const { Multiaddr } = require('multiaddr') const PeerId = require('peer-id') const delay = require('delay') @@ -55,7 +55,7 @@ module.exports = (common) => { expect(PeerId.isPeerId(id)).to.eql(true) expect(multiaddrs).to.exist() - multiaddrs.forEach((m) => expect(multiaddr.isMultiaddr(m)).to.eql(true)) + multiaddrs.forEach((m) => expect(Multiaddr.isMultiaddr(m)).to.eql(true)) defer.resolve() }) diff --git a/src/pubsub/index.js b/src/pubsub/index.js index f7338cae8..e74848469 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -11,7 +11,7 @@ const { pipe } = require('it-pipe') const MulticodecTopology = require('../topology/multicodec-topology') const { codes } = require('./errors') -const message = require('./message') +const { RPC } = require('./message/rpc') const PeerStreams = require('./peer-streams') const { SignaturePolicy } = require('./signature-policy') const utils = require('./utils') @@ -27,10 +27,10 @@ const { * @typedef {import('bl')} BufferList * @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream * @typedef {import('../connection/connection')} Connection - * @typedef {import('./message/types').RPC} RPC - * @typedef {import('./message/types').SubOpts} RPCSubOpts - * @typedef {import('./message/types').Message} RPCMessage * @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType + * @typedef {import('./message/rpc').IRPC} IRPC + * @typedef {import('./message/rpc').RPC.SubOpts} RPCSubOpts + * @typedef {import('./message/rpc').RPC.Message} RPCMessage */ /** @@ -382,7 +382,7 @@ class PubsubBaseProtocol extends EventEmitter { if (subs.length) { // update peer subscriptions - subs.forEach((/** @type {RPCSubOpts} */ subOpt) => { + subs.forEach((subOpt) => { this._processRpcSubOpt(idB58Str, subOpt) }) this.emit('pubsub:subscription-change', peerStreams.id, subs) @@ -396,7 +396,7 @@ class PubsubBaseProtocol extends EventEmitter { if (msgs.length) { // @ts-ignore RPC message is modified msgs.forEach((message) => { - if (!(this.canRelayMessage || message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic)))) { + if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) { this.log('received message we didn\'t subscribe to. Dropping.') return } @@ -411,11 +411,15 @@ class PubsubBaseProtocol extends EventEmitter { * Handles a subscription change from a peer * * @param {string} id - * @param {RPCSubOpts} subOpt + * @param {RPC.ISubOpts} subOpt */ _processRpcSubOpt (id, subOpt) { const t = subOpt.topicID + if (!t) { + return + } + let topicSet = this.topics.get(t) if (!topicSet) { topicSet = new Set() @@ -473,13 +477,14 @@ class PubsubBaseProtocol extends EventEmitter { * The default msgID implementation * Child class can override this. * - * @param {RPCMessage} msg - the message object + * @param {InMessage} msg - the message object * @returns {Uint8Array} message id as bytes */ getMsgId (msg) { const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case SignaturePolicy.StrictSign: + // @ts-ignore seqno is optional in protobuf definition but it will exist return utils.msgId(msg.from, msg.seqno) case SignaturePolicy.StrictNoSign: return utils.noSignMsgId(msg.data) @@ -508,25 +513,25 @@ class PubsubBaseProtocol extends EventEmitter { * @returns {RPC} */ _decodeRpc (bytes) { - return message.rpc.RPC.decode(bytes) + return RPC.decode(bytes) } /** * Encode RPC object into a Uint8Array. * This can be override to use a custom router protobuf. * - * @param {RPC} rpc + * @param {IRPC} rpc * @returns {Uint8Array} */ _encodeRpc (rpc) { - return message.rpc.RPC.encode(rpc) + return RPC.encode(rpc).finish() } /** * Send an rpc object to a peer * * @param {string} id - peer id - * @param {RPC} rpc + * @param {IRPC} rpc * @returns {void} */ _sendRpc (id, rpc) { @@ -592,12 +597,12 @@ class PubsubBaseProtocol extends EventEmitter { default: throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY) } + for (const topic of message.topicIDs) { const validatorFn = this.topicValidators.get(topic) - if (!validatorFn) { - continue // eslint-disable-line + if (validatorFn) { + await validatorFn(topic, message) } - await validatorFn(topic, message) } } @@ -606,8 +611,8 @@ class PubsubBaseProtocol extends EventEmitter { * Should be used by the routers to create the message to send. * * @protected - * @param {RPCMessage} message - * @returns {Promise} + * @param {InMessage} message + * @returns {Promise} */ _buildMessage (message) { const signaturePolicy = this.globalSignaturePolicy @@ -617,7 +622,7 @@ class PubsubBaseProtocol extends EventEmitter { message.seqno = utils.randomSeqno() return signMessage(this.peerId, utils.normalizeOutRpcMessage(message)) case SignaturePolicy.StrictNoSign: - return message + return Promise.resolve(message) default: throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY) } @@ -663,7 +668,7 @@ class PubsubBaseProtocol extends EventEmitter { this.log('publish', topic, message) const from = this.peerId.toB58String() - let msgObject = { + const msgObject = { receivedFrom: from, data: message, topicIDs: [topic] @@ -671,13 +676,14 @@ class PubsubBaseProtocol extends EventEmitter { // ensure that the message follows the signature policy const outMsg = await this._buildMessage(msgObject) - msgObject = utils.normalizeInRpcMessage(outMsg) + // @ts-ignore different type as from is converted + const msg = utils.normalizeInRpcMessage(outMsg) // Emit to self if I'm interested and emitSelf enabled - this.emitSelf && this._emitMessage(msgObject) + this.emitSelf && this._emitMessage(msg) // send to all the other peers - await this._publish(msgObject) + await this._publish(msg) } /** @@ -685,7 +691,7 @@ class PubsubBaseProtocol extends EventEmitter { * For example, a Floodsub implementation might simply publish each message to each topic for every peer * * @abstract - * @param {InMessage} message + * @param {InMessage|RPCMessage} message * @returns {Promise} * */ @@ -744,7 +750,6 @@ class PubsubBaseProtocol extends EventEmitter { } } -PubsubBaseProtocol.message = message PubsubBaseProtocol.utils = utils PubsubBaseProtocol.SignaturePolicy = SignaturePolicy diff --git a/src/pubsub/message/index.js b/src/pubsub/message/index.js deleted file mode 100644 index 2a30183b9..000000000 --- a/src/pubsub/message/index.js +++ /dev/null @@ -1,16 +0,0 @@ -'use strict' - -// @ts-ignore protons not typed -const protons = require('protons') - -const rpcProto = protons(require('./rpc.proto.js')) -const RPC = rpcProto.RPC -const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) - -module.exports = { - rpc: rpcProto, - td: topicDescriptorProto, - RPC, - Message: RPC.Message, - SubOpts: RPC.SubOpts -} diff --git a/src/pubsub/message/rpc.d.ts b/src/pubsub/message/rpc.d.ts new file mode 100644 index 000000000..5836c6695 --- /dev/null +++ b/src/pubsub/message/rpc.d.ts @@ -0,0 +1,222 @@ +import * as $protobuf from "protobufjs"; +/** Properties of a RPC. */ +export interface IRPC { + + /** RPC subscriptions */ + subscriptions?: (RPC.ISubOpts[]|null); + + /** RPC msgs */ + msgs?: (RPC.IMessage[]|null); +} + +/** Represents a RPC. */ +export class RPC implements IRPC { + + /** + * Constructs a new RPC. + * @param [p] Properties to set + */ + constructor(p?: IRPC); + + /** RPC subscriptions. */ + public subscriptions: RPC.ISubOpts[]; + + /** RPC msgs. */ + public msgs: RPC.IMessage[]; + + /** + * Encodes the specified RPC message. Does not implicitly {@link RPC.verify|verify} messages. + * @param m RPC message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: IRPC, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a RPC message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns RPC + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): RPC; + + /** + * Creates a RPC message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns RPC + */ + public static fromObject(d: { [k: string]: any }): RPC; + + /** + * Creates a plain object from a RPC message. Also converts values to other types if specified. + * @param m RPC + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: RPC, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this RPC to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +export namespace RPC { + + /** Properties of a SubOpts. */ + interface ISubOpts { + + /** SubOpts subscribe */ + subscribe?: (boolean|null); + + /** SubOpts topicID */ + topicID?: (string|null); + } + + /** Represents a SubOpts. */ + class SubOpts implements ISubOpts { + + /** + * Constructs a new SubOpts. + * @param [p] Properties to set + */ + constructor(p?: RPC.ISubOpts); + + /** SubOpts subscribe. */ + public subscribe: boolean; + + /** SubOpts topicID. */ + public topicID: string; + + /** + * Encodes the specified SubOpts message. Does not implicitly {@link RPC.SubOpts.verify|verify} messages. + * @param m SubOpts message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: RPC.ISubOpts, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a SubOpts message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns SubOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): RPC.SubOpts; + + /** + * Creates a SubOpts message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns SubOpts + */ + public static fromObject(d: { [k: string]: any }): RPC.SubOpts; + + /** + * Creates a plain object from a SubOpts message. Also converts values to other types if specified. + * @param m SubOpts + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: RPC.SubOpts, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this SubOpts to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + } + + /** Properties of a Message. */ + interface IMessage { + + /** Message from */ + from?: (Uint8Array|null); + + /** Message data */ + data?: (Uint8Array|null); + + /** Message seqno */ + seqno?: (Uint8Array|null); + + /** Message topicIDs */ + topicIDs?: (string[]|null); + + /** Message signature */ + signature?: (Uint8Array|null); + + /** Message key */ + key?: (Uint8Array|null); + } + + /** Represents a Message. */ + class Message implements IMessage { + + /** + * Constructs a new Message. + * @param [p] Properties to set + */ + constructor(p?: RPC.IMessage); + + /** Message from. */ + public from: Uint8Array; + + /** Message data. */ + public data: Uint8Array; + + /** Message seqno. */ + public seqno: Uint8Array; + + /** Message topicIDs. */ + public topicIDs: string[]; + + /** Message signature. */ + public signature: Uint8Array; + + /** Message key. */ + public key: Uint8Array; + + /** + * Encodes the specified Message message. Does not implicitly {@link RPC.Message.verify|verify} messages. + * @param m Message message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: RPC.IMessage, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a Message message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns Message + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): RPC.Message; + + /** + * Creates a Message message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns Message + */ + public static fromObject(d: { [k: string]: any }): RPC.Message; + + /** + * Creates a plain object from a Message message. Also converts values to other types if specified. + * @param m Message + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: RPC.Message, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this Message to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + } +} diff --git a/src/pubsub/message/rpc.js b/src/pubsub/message/rpc.js new file mode 100644 index 000000000..d6bed76c9 --- /dev/null +++ b/src/pubsub/message/rpc.js @@ -0,0 +1,645 @@ +/*eslint-disable*/ +"use strict"; + +var $protobuf = require("protobufjs/minimal"); + +// Common aliases +var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +var $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +$root.RPC = (function() { + + /** + * Properties of a RPC. + * @exports IRPC + * @interface IRPC + * @property {Array.|null} [subscriptions] RPC subscriptions + * @property {Array.|null} [msgs] RPC msgs + */ + + /** + * Constructs a new RPC. + * @exports RPC + * @classdesc Represents a RPC. + * @implements IRPC + * @constructor + * @param {IRPC=} [p] Properties to set + */ + function RPC(p) { + this.subscriptions = []; + this.msgs = []; + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * RPC subscriptions. + * @member {Array.} subscriptions + * @memberof RPC + * @instance + */ + RPC.prototype.subscriptions = $util.emptyArray; + + /** + * RPC msgs. + * @member {Array.} msgs + * @memberof RPC + * @instance + */ + RPC.prototype.msgs = $util.emptyArray; + + /** + * Encodes the specified RPC message. Does not implicitly {@link RPC.verify|verify} messages. + * @function encode + * @memberof RPC + * @static + * @param {IRPC} m RPC message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + RPC.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.subscriptions != null && m.subscriptions.length) { + for (var i = 0; i < m.subscriptions.length; ++i) + $root.RPC.SubOpts.encode(m.subscriptions[i], w.uint32(10).fork()).ldelim(); + } + if (m.msgs != null && m.msgs.length) { + for (var i = 0; i < m.msgs.length; ++i) + $root.RPC.Message.encode(m.msgs[i], w.uint32(18).fork()).ldelim(); + } + return w; + }; + + /** + * Decodes a RPC message from the specified reader or buffer. + * @function decode + * @memberof RPC + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {RPC} RPC + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + RPC.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.RPC(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + if (!(m.subscriptions && m.subscriptions.length)) + m.subscriptions = []; + m.subscriptions.push($root.RPC.SubOpts.decode(r, r.uint32())); + break; + case 2: + if (!(m.msgs && m.msgs.length)) + m.msgs = []; + m.msgs.push($root.RPC.Message.decode(r, r.uint32())); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a RPC message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof RPC + * @static + * @param {Object.} d Plain object + * @returns {RPC} RPC + */ + RPC.fromObject = function fromObject(d) { + if (d instanceof $root.RPC) + return d; + var m = new $root.RPC(); + if (d.subscriptions) { + if (!Array.isArray(d.subscriptions)) + throw TypeError(".RPC.subscriptions: array expected"); + m.subscriptions = []; + for (var i = 0; i < d.subscriptions.length; ++i) { + if (typeof d.subscriptions[i] !== "object") + throw TypeError(".RPC.subscriptions: object expected"); + m.subscriptions[i] = $root.RPC.SubOpts.fromObject(d.subscriptions[i]); + } + } + if (d.msgs) { + if (!Array.isArray(d.msgs)) + throw TypeError(".RPC.msgs: array expected"); + m.msgs = []; + for (var i = 0; i < d.msgs.length; ++i) { + if (typeof d.msgs[i] !== "object") + throw TypeError(".RPC.msgs: object expected"); + m.msgs[i] = $root.RPC.Message.fromObject(d.msgs[i]); + } + } + return m; + }; + + /** + * Creates a plain object from a RPC message. Also converts values to other types if specified. + * @function toObject + * @memberof RPC + * @static + * @param {RPC} m RPC + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + RPC.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.arrays || o.defaults) { + d.subscriptions = []; + d.msgs = []; + } + if (m.subscriptions && m.subscriptions.length) { + d.subscriptions = []; + for (var j = 0; j < m.subscriptions.length; ++j) { + d.subscriptions[j] = $root.RPC.SubOpts.toObject(m.subscriptions[j], o); + } + } + if (m.msgs && m.msgs.length) { + d.msgs = []; + for (var j = 0; j < m.msgs.length; ++j) { + d.msgs[j] = $root.RPC.Message.toObject(m.msgs[j], o); + } + } + return d; + }; + + /** + * Converts this RPC to JSON. + * @function toJSON + * @memberof RPC + * @instance + * @returns {Object.} JSON object + */ + RPC.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + RPC.SubOpts = (function() { + + /** + * Properties of a SubOpts. + * @memberof RPC + * @interface ISubOpts + * @property {boolean|null} [subscribe] SubOpts subscribe + * @property {string|null} [topicID] SubOpts topicID + */ + + /** + * Constructs a new SubOpts. + * @memberof RPC + * @classdesc Represents a SubOpts. + * @implements ISubOpts + * @constructor + * @param {RPC.ISubOpts=} [p] Properties to set + */ + function SubOpts(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * SubOpts subscribe. + * @member {boolean} subscribe + * @memberof RPC.SubOpts + * @instance + */ + SubOpts.prototype.subscribe = false; + + /** + * SubOpts topicID. + * @member {string} topicID + * @memberof RPC.SubOpts + * @instance + */ + SubOpts.prototype.topicID = ""; + + /** + * Encodes the specified SubOpts message. Does not implicitly {@link RPC.SubOpts.verify|verify} messages. + * @function encode + * @memberof RPC.SubOpts + * @static + * @param {RPC.ISubOpts} m SubOpts message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + SubOpts.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.subscribe != null && Object.hasOwnProperty.call(m, "subscribe")) + w.uint32(8).bool(m.subscribe); + if (m.topicID != null && Object.hasOwnProperty.call(m, "topicID")) + w.uint32(18).string(m.topicID); + return w; + }; + + /** + * Decodes a SubOpts message from the specified reader or buffer. + * @function decode + * @memberof RPC.SubOpts + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {RPC.SubOpts} SubOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + SubOpts.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.RPC.SubOpts(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.subscribe = r.bool(); + break; + case 2: + m.topicID = r.string(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a SubOpts message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof RPC.SubOpts + * @static + * @param {Object.} d Plain object + * @returns {RPC.SubOpts} SubOpts + */ + SubOpts.fromObject = function fromObject(d) { + if (d instanceof $root.RPC.SubOpts) + return d; + var m = new $root.RPC.SubOpts(); + if (d.subscribe != null) { + m.subscribe = Boolean(d.subscribe); + } + if (d.topicID != null) { + m.topicID = String(d.topicID); + } + return m; + }; + + /** + * Creates a plain object from a SubOpts message. Also converts values to other types if specified. + * @function toObject + * @memberof RPC.SubOpts + * @static + * @param {RPC.SubOpts} m SubOpts + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + SubOpts.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.subscribe = false; + d.topicID = ""; + } + if (m.subscribe != null && m.hasOwnProperty("subscribe")) { + d.subscribe = m.subscribe; + } + if (m.topicID != null && m.hasOwnProperty("topicID")) { + d.topicID = m.topicID; + } + return d; + }; + + /** + * Converts this SubOpts to JSON. + * @function toJSON + * @memberof RPC.SubOpts + * @instance + * @returns {Object.} JSON object + */ + SubOpts.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + return SubOpts; + })(); + + RPC.Message = (function() { + + /** + * Properties of a Message. + * @memberof RPC + * @interface IMessage + * @property {Uint8Array|null} [from] Message from + * @property {Uint8Array|null} [data] Message data + * @property {Uint8Array|null} [seqno] Message seqno + * @property {Array.|null} [topicIDs] Message topicIDs + * @property {Uint8Array|null} [signature] Message signature + * @property {Uint8Array|null} [key] Message key + */ + + /** + * Constructs a new Message. + * @memberof RPC + * @classdesc Represents a Message. + * @implements IMessage + * @constructor + * @param {RPC.IMessage=} [p] Properties to set + */ + function Message(p) { + this.topicIDs = []; + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * Message from. + * @member {Uint8Array} from + * @memberof RPC.Message + * @instance + */ + Message.prototype.from = $util.newBuffer([]); + + /** + * Message data. + * @member {Uint8Array} data + * @memberof RPC.Message + * @instance + */ + Message.prototype.data = $util.newBuffer([]); + + /** + * Message seqno. + * @member {Uint8Array} seqno + * @memberof RPC.Message + * @instance + */ + Message.prototype.seqno = $util.newBuffer([]); + + /** + * Message topicIDs. + * @member {Array.} topicIDs + * @memberof RPC.Message + * @instance + */ + Message.prototype.topicIDs = $util.emptyArray; + + /** + * Message signature. + * @member {Uint8Array} signature + * @memberof RPC.Message + * @instance + */ + Message.prototype.signature = $util.newBuffer([]); + + /** + * Message key. + * @member {Uint8Array} key + * @memberof RPC.Message + * @instance + */ + Message.prototype.key = $util.newBuffer([]); + + /** + * Encodes the specified Message message. Does not implicitly {@link RPC.Message.verify|verify} messages. + * @function encode + * @memberof RPC.Message + * @static + * @param {RPC.IMessage} m Message message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + Message.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.from != null && Object.hasOwnProperty.call(m, "from")) + w.uint32(10).bytes(m.from); + if (m.data != null && Object.hasOwnProperty.call(m, "data")) + w.uint32(18).bytes(m.data); + if (m.seqno != null && Object.hasOwnProperty.call(m, "seqno")) + w.uint32(26).bytes(m.seqno); + if (m.topicIDs != null && m.topicIDs.length) { + for (var i = 0; i < m.topicIDs.length; ++i) + w.uint32(34).string(m.topicIDs[i]); + } + if (m.signature != null && Object.hasOwnProperty.call(m, "signature")) + w.uint32(42).bytes(m.signature); + if (m.key != null && Object.hasOwnProperty.call(m, "key")) + w.uint32(50).bytes(m.key); + return w; + }; + + /** + * Decodes a Message message from the specified reader or buffer. + * @function decode + * @memberof RPC.Message + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {RPC.Message} Message + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + Message.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.RPC.Message(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.from = r.bytes(); + break; + case 2: + m.data = r.bytes(); + break; + case 3: + m.seqno = r.bytes(); + break; + case 4: + if (!(m.topicIDs && m.topicIDs.length)) + m.topicIDs = []; + m.topicIDs.push(r.string()); + break; + case 5: + m.signature = r.bytes(); + break; + case 6: + m.key = r.bytes(); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a Message message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof RPC.Message + * @static + * @param {Object.} d Plain object + * @returns {RPC.Message} Message + */ + Message.fromObject = function fromObject(d) { + if (d instanceof $root.RPC.Message) + return d; + var m = new $root.RPC.Message(); + if (d.from != null) { + if (typeof d.from === "string") + $util.base64.decode(d.from, m.from = $util.newBuffer($util.base64.length(d.from)), 0); + else if (d.from.length) + m.from = d.from; + } + if (d.data != null) { + if (typeof d.data === "string") + $util.base64.decode(d.data, m.data = $util.newBuffer($util.base64.length(d.data)), 0); + else if (d.data.length) + m.data = d.data; + } + if (d.seqno != null) { + if (typeof d.seqno === "string") + $util.base64.decode(d.seqno, m.seqno = $util.newBuffer($util.base64.length(d.seqno)), 0); + else if (d.seqno.length) + m.seqno = d.seqno; + } + if (d.topicIDs) { + if (!Array.isArray(d.topicIDs)) + throw TypeError(".RPC.Message.topicIDs: array expected"); + m.topicIDs = []; + for (var i = 0; i < d.topicIDs.length; ++i) { + m.topicIDs[i] = String(d.topicIDs[i]); + } + } + if (d.signature != null) { + if (typeof d.signature === "string") + $util.base64.decode(d.signature, m.signature = $util.newBuffer($util.base64.length(d.signature)), 0); + else if (d.signature.length) + m.signature = d.signature; + } + if (d.key != null) { + if (typeof d.key === "string") + $util.base64.decode(d.key, m.key = $util.newBuffer($util.base64.length(d.key)), 0); + else if (d.key.length) + m.key = d.key; + } + return m; + }; + + /** + * Creates a plain object from a Message message. Also converts values to other types if specified. + * @function toObject + * @memberof RPC.Message + * @static + * @param {RPC.Message} m Message + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + Message.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.arrays || o.defaults) { + d.topicIDs = []; + } + if (o.defaults) { + if (o.bytes === String) + d.from = ""; + else { + d.from = []; + if (o.bytes !== Array) + d.from = $util.newBuffer(d.from); + } + if (o.bytes === String) + d.data = ""; + else { + d.data = []; + if (o.bytes !== Array) + d.data = $util.newBuffer(d.data); + } + if (o.bytes === String) + d.seqno = ""; + else { + d.seqno = []; + if (o.bytes !== Array) + d.seqno = $util.newBuffer(d.seqno); + } + if (o.bytes === String) + d.signature = ""; + else { + d.signature = []; + if (o.bytes !== Array) + d.signature = $util.newBuffer(d.signature); + } + if (o.bytes === String) + d.key = ""; + else { + d.key = []; + if (o.bytes !== Array) + d.key = $util.newBuffer(d.key); + } + } + if (m.from != null && m.hasOwnProperty("from")) { + d.from = o.bytes === String ? $util.base64.encode(m.from, 0, m.from.length) : o.bytes === Array ? Array.prototype.slice.call(m.from) : m.from; + } + if (m.data != null && m.hasOwnProperty("data")) { + d.data = o.bytes === String ? $util.base64.encode(m.data, 0, m.data.length) : o.bytes === Array ? Array.prototype.slice.call(m.data) : m.data; + } + if (m.seqno != null && m.hasOwnProperty("seqno")) { + d.seqno = o.bytes === String ? $util.base64.encode(m.seqno, 0, m.seqno.length) : o.bytes === Array ? Array.prototype.slice.call(m.seqno) : m.seqno; + } + if (m.topicIDs && m.topicIDs.length) { + d.topicIDs = []; + for (var j = 0; j < m.topicIDs.length; ++j) { + d.topicIDs[j] = m.topicIDs[j]; + } + } + if (m.signature != null && m.hasOwnProperty("signature")) { + d.signature = o.bytes === String ? $util.base64.encode(m.signature, 0, m.signature.length) : o.bytes === Array ? Array.prototype.slice.call(m.signature) : m.signature; + } + if (m.key != null && m.hasOwnProperty("key")) { + d.key = o.bytes === String ? $util.base64.encode(m.key, 0, m.key.length) : o.bytes === Array ? Array.prototype.slice.call(m.key) : m.key; + } + return d; + }; + + /** + * Converts this Message to JSON. + * @function toJSON + * @memberof RPC.Message + * @instance + * @returns {Object.} JSON object + */ + Message.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + return Message; + })(); + + return RPC; +})(); + +module.exports = $root; diff --git a/src/pubsub/message/rpc.proto.js b/src/pubsub/message/rpc.proto similarity index 92% rename from src/pubsub/message/rpc.proto.js rename to src/pubsub/message/rpc.proto index 88b1f8342..96666df3d 100644 --- a/src/pubsub/message/rpc.proto.js +++ b/src/pubsub/message/rpc.proto @@ -1,5 +1,5 @@ -'use strict' -module.exports = ` +syntax = "proto3"; + message RPC { repeated SubOpts subscriptions = 1; repeated Message msgs = 2; @@ -17,4 +17,4 @@ message RPC { optional bytes signature = 5; optional bytes key = 6; } -}` +} \ No newline at end of file diff --git a/src/pubsub/message/sign.js b/src/pubsub/message/sign.js index 4820b117c..0b4b8f2df 100644 --- a/src/pubsub/message/sign.js +++ b/src/pubsub/message/sign.js @@ -1,23 +1,27 @@ 'use strict' const PeerId = require('peer-id') -const { Message } = require('./index') +const { RPC } = require('./rpc') const uint8ArrayConcat = require('uint8arrays/concat') const uint8ArrayFromString = require('uint8arrays/from-string') const SignPrefix = uint8ArrayFromString('libp2p-pubsub:') +/** + * @typedef {import('..').InMessage} + */ + /** * Signs the provided message with the given `peerId` * * @param {PeerId} peerId - * @param {Message} message - * @returns {Promise} + * @param {RPC.Message} message + * @returns {Promise} */ async function signMessage (peerId, message) { // Get the message in bytes, and prepend with the pubsub prefix const bytes = uint8ArrayConcat([ SignPrefix, - Message.encode(message) + RPC.Message.encode(message).finish() ]) const signature = await peerId.privKey.sign(bytes) @@ -43,12 +47,13 @@ async function verifySignature (message) { // Get message sans the signature const bytes = uint8ArrayConcat([ SignPrefix, - Message.encode({ + RPC.Message.encode({ ...message, - from: message.from && PeerId.createFromCID(message.from).toBytes(), + // @ts-ignore message.from needs to exist + from: PeerId.createFromCID(message.from).toBytes(), signature: undefined, key: undefined - }) + }).finish() ]) // Get the public key diff --git a/src/pubsub/message/topic-descriptor.d.ts b/src/pubsub/message/topic-descriptor.d.ts new file mode 100644 index 000000000..179deba27 --- /dev/null +++ b/src/pubsub/message/topic-descriptor.d.ts @@ -0,0 +1,224 @@ +import * as $protobuf from "protobufjs"; +/** Properties of a TopicDescriptor. */ +export interface ITopicDescriptor { + + /** TopicDescriptor name */ + name?: (string|null); + + /** TopicDescriptor auth */ + auth?: (TopicDescriptor.IAuthOpts|null); + + /** TopicDescriptor enc */ + enc?: (TopicDescriptor.IEncOpts|null); +} + +/** Represents a TopicDescriptor. */ +export class TopicDescriptor implements ITopicDescriptor { + + /** + * Constructs a new TopicDescriptor. + * @param [p] Properties to set + */ + constructor(p?: ITopicDescriptor); + + /** TopicDescriptor name. */ + public name: string; + + /** TopicDescriptor auth. */ + public auth?: (TopicDescriptor.IAuthOpts|null); + + /** TopicDescriptor enc. */ + public enc?: (TopicDescriptor.IEncOpts|null); + + /** + * Encodes the specified TopicDescriptor message. Does not implicitly {@link TopicDescriptor.verify|verify} messages. + * @param m TopicDescriptor message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: ITopicDescriptor, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a TopicDescriptor message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns TopicDescriptor + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): TopicDescriptor; + + /** + * Creates a TopicDescriptor message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns TopicDescriptor + */ + public static fromObject(d: { [k: string]: any }): TopicDescriptor; + + /** + * Creates a plain object from a TopicDescriptor message. Also converts values to other types if specified. + * @param m TopicDescriptor + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: TopicDescriptor, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this TopicDescriptor to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; +} + +export namespace TopicDescriptor { + + /** Properties of an AuthOpts. */ + interface IAuthOpts { + + /** AuthOpts mode */ + mode?: (TopicDescriptor.AuthOpts.AuthMode|null); + + /** AuthOpts keys */ + keys?: (Uint8Array[]|null); + } + + /** Represents an AuthOpts. */ + class AuthOpts implements IAuthOpts { + + /** + * Constructs a new AuthOpts. + * @param [p] Properties to set + */ + constructor(p?: TopicDescriptor.IAuthOpts); + + /** AuthOpts mode. */ + public mode: TopicDescriptor.AuthOpts.AuthMode; + + /** AuthOpts keys. */ + public keys: Uint8Array[]; + + /** + * Encodes the specified AuthOpts message. Does not implicitly {@link TopicDescriptor.AuthOpts.verify|verify} messages. + * @param m AuthOpts message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: TopicDescriptor.IAuthOpts, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes an AuthOpts message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns AuthOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): TopicDescriptor.AuthOpts; + + /** + * Creates an AuthOpts message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns AuthOpts + */ + public static fromObject(d: { [k: string]: any }): TopicDescriptor.AuthOpts; + + /** + * Creates a plain object from an AuthOpts message. Also converts values to other types if specified. + * @param m AuthOpts + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: TopicDescriptor.AuthOpts, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this AuthOpts to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + } + + namespace AuthOpts { + + /** AuthMode enum. */ + enum AuthMode { + NONE = 0, + KEY = 1, + WOT = 2 + } + } + + /** Properties of an EncOpts. */ + interface IEncOpts { + + /** EncOpts mode */ + mode?: (TopicDescriptor.EncOpts.EncMode|null); + + /** EncOpts keyHashes */ + keyHashes?: (Uint8Array[]|null); + } + + /** Represents an EncOpts. */ + class EncOpts implements IEncOpts { + + /** + * Constructs a new EncOpts. + * @param [p] Properties to set + */ + constructor(p?: TopicDescriptor.IEncOpts); + + /** EncOpts mode. */ + public mode: TopicDescriptor.EncOpts.EncMode; + + /** EncOpts keyHashes. */ + public keyHashes: Uint8Array[]; + + /** + * Encodes the specified EncOpts message. Does not implicitly {@link TopicDescriptor.EncOpts.verify|verify} messages. + * @param m EncOpts message or plain object to encode + * @param [w] Writer to encode to + * @returns Writer + */ + public static encode(m: TopicDescriptor.IEncOpts, w?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes an EncOpts message from the specified reader or buffer. + * @param r Reader or buffer to decode from + * @param [l] Message length if known beforehand + * @returns EncOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): TopicDescriptor.EncOpts; + + /** + * Creates an EncOpts message from a plain object. Also converts values to their respective internal types. + * @param d Plain object + * @returns EncOpts + */ + public static fromObject(d: { [k: string]: any }): TopicDescriptor.EncOpts; + + /** + * Creates a plain object from an EncOpts message. Also converts values to other types if specified. + * @param m EncOpts + * @param [o] Conversion options + * @returns Plain object + */ + public static toObject(m: TopicDescriptor.EncOpts, o?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this EncOpts to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + } + + namespace EncOpts { + + /** EncMode enum. */ + enum EncMode { + NONE = 0, + SHAREDKEY = 1, + WOT = 2 + } + } +} diff --git a/src/pubsub/message/topic-descriptor.js b/src/pubsub/message/topic-descriptor.js new file mode 100644 index 000000000..0682d7516 --- /dev/null +++ b/src/pubsub/message/topic-descriptor.js @@ -0,0 +1,586 @@ +/*eslint-disable*/ +"use strict"; + +var $protobuf = require("protobufjs/minimal"); + +// Common aliases +var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +var $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +$root.TopicDescriptor = (function() { + + /** + * Properties of a TopicDescriptor. + * @exports ITopicDescriptor + * @interface ITopicDescriptor + * @property {string|null} [name] TopicDescriptor name + * @property {TopicDescriptor.IAuthOpts|null} [auth] TopicDescriptor auth + * @property {TopicDescriptor.IEncOpts|null} [enc] TopicDescriptor enc + */ + + /** + * Constructs a new TopicDescriptor. + * @exports TopicDescriptor + * @classdesc Represents a TopicDescriptor. + * @implements ITopicDescriptor + * @constructor + * @param {ITopicDescriptor=} [p] Properties to set + */ + function TopicDescriptor(p) { + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * TopicDescriptor name. + * @member {string} name + * @memberof TopicDescriptor + * @instance + */ + TopicDescriptor.prototype.name = ""; + + /** + * TopicDescriptor auth. + * @member {TopicDescriptor.IAuthOpts|null|undefined} auth + * @memberof TopicDescriptor + * @instance + */ + TopicDescriptor.prototype.auth = null; + + /** + * TopicDescriptor enc. + * @member {TopicDescriptor.IEncOpts|null|undefined} enc + * @memberof TopicDescriptor + * @instance + */ + TopicDescriptor.prototype.enc = null; + + /** + * Encodes the specified TopicDescriptor message. Does not implicitly {@link TopicDescriptor.verify|verify} messages. + * @function encode + * @memberof TopicDescriptor + * @static + * @param {ITopicDescriptor} m TopicDescriptor message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + TopicDescriptor.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.name != null && Object.hasOwnProperty.call(m, "name")) + w.uint32(10).string(m.name); + if (m.auth != null && Object.hasOwnProperty.call(m, "auth")) + $root.TopicDescriptor.AuthOpts.encode(m.auth, w.uint32(18).fork()).ldelim(); + if (m.enc != null && Object.hasOwnProperty.call(m, "enc")) + $root.TopicDescriptor.EncOpts.encode(m.enc, w.uint32(26).fork()).ldelim(); + return w; + }; + + /** + * Decodes a TopicDescriptor message from the specified reader or buffer. + * @function decode + * @memberof TopicDescriptor + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {TopicDescriptor} TopicDescriptor + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + TopicDescriptor.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.TopicDescriptor(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.name = r.string(); + break; + case 2: + m.auth = $root.TopicDescriptor.AuthOpts.decode(r, r.uint32()); + break; + case 3: + m.enc = $root.TopicDescriptor.EncOpts.decode(r, r.uint32()); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates a TopicDescriptor message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof TopicDescriptor + * @static + * @param {Object.} d Plain object + * @returns {TopicDescriptor} TopicDescriptor + */ + TopicDescriptor.fromObject = function fromObject(d) { + if (d instanceof $root.TopicDescriptor) + return d; + var m = new $root.TopicDescriptor(); + if (d.name != null) { + m.name = String(d.name); + } + if (d.auth != null) { + if (typeof d.auth !== "object") + throw TypeError(".TopicDescriptor.auth: object expected"); + m.auth = $root.TopicDescriptor.AuthOpts.fromObject(d.auth); + } + if (d.enc != null) { + if (typeof d.enc !== "object") + throw TypeError(".TopicDescriptor.enc: object expected"); + m.enc = $root.TopicDescriptor.EncOpts.fromObject(d.enc); + } + return m; + }; + + /** + * Creates a plain object from a TopicDescriptor message. Also converts values to other types if specified. + * @function toObject + * @memberof TopicDescriptor + * @static + * @param {TopicDescriptor} m TopicDescriptor + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + TopicDescriptor.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.defaults) { + d.name = ""; + d.auth = null; + d.enc = null; + } + if (m.name != null && m.hasOwnProperty("name")) { + d.name = m.name; + } + if (m.auth != null && m.hasOwnProperty("auth")) { + d.auth = $root.TopicDescriptor.AuthOpts.toObject(m.auth, o); + } + if (m.enc != null && m.hasOwnProperty("enc")) { + d.enc = $root.TopicDescriptor.EncOpts.toObject(m.enc, o); + } + return d; + }; + + /** + * Converts this TopicDescriptor to JSON. + * @function toJSON + * @memberof TopicDescriptor + * @instance + * @returns {Object.} JSON object + */ + TopicDescriptor.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + TopicDescriptor.AuthOpts = (function() { + + /** + * Properties of an AuthOpts. + * @memberof TopicDescriptor + * @interface IAuthOpts + * @property {TopicDescriptor.AuthOpts.AuthMode|null} [mode] AuthOpts mode + * @property {Array.|null} [keys] AuthOpts keys + */ + + /** + * Constructs a new AuthOpts. + * @memberof TopicDescriptor + * @classdesc Represents an AuthOpts. + * @implements IAuthOpts + * @constructor + * @param {TopicDescriptor.IAuthOpts=} [p] Properties to set + */ + function AuthOpts(p) { + this.keys = []; + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * AuthOpts mode. + * @member {TopicDescriptor.AuthOpts.AuthMode} mode + * @memberof TopicDescriptor.AuthOpts + * @instance + */ + AuthOpts.prototype.mode = 0; + + /** + * AuthOpts keys. + * @member {Array.} keys + * @memberof TopicDescriptor.AuthOpts + * @instance + */ + AuthOpts.prototype.keys = $util.emptyArray; + + /** + * Encodes the specified AuthOpts message. Does not implicitly {@link TopicDescriptor.AuthOpts.verify|verify} messages. + * @function encode + * @memberof TopicDescriptor.AuthOpts + * @static + * @param {TopicDescriptor.IAuthOpts} m AuthOpts message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + AuthOpts.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.mode != null && Object.hasOwnProperty.call(m, "mode")) + w.uint32(8).int32(m.mode); + if (m.keys != null && m.keys.length) { + for (var i = 0; i < m.keys.length; ++i) + w.uint32(18).bytes(m.keys[i]); + } + return w; + }; + + /** + * Decodes an AuthOpts message from the specified reader or buffer. + * @function decode + * @memberof TopicDescriptor.AuthOpts + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {TopicDescriptor.AuthOpts} AuthOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + AuthOpts.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.TopicDescriptor.AuthOpts(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.mode = r.int32(); + break; + case 2: + if (!(m.keys && m.keys.length)) + m.keys = []; + m.keys.push(r.bytes()); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates an AuthOpts message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof TopicDescriptor.AuthOpts + * @static + * @param {Object.} d Plain object + * @returns {TopicDescriptor.AuthOpts} AuthOpts + */ + AuthOpts.fromObject = function fromObject(d) { + if (d instanceof $root.TopicDescriptor.AuthOpts) + return d; + var m = new $root.TopicDescriptor.AuthOpts(); + switch (d.mode) { + case "NONE": + case 0: + m.mode = 0; + break; + case "KEY": + case 1: + m.mode = 1; + break; + case "WOT": + case 2: + m.mode = 2; + break; + } + if (d.keys) { + if (!Array.isArray(d.keys)) + throw TypeError(".TopicDescriptor.AuthOpts.keys: array expected"); + m.keys = []; + for (var i = 0; i < d.keys.length; ++i) { + if (typeof d.keys[i] === "string") + $util.base64.decode(d.keys[i], m.keys[i] = $util.newBuffer($util.base64.length(d.keys[i])), 0); + else if (d.keys[i].length) + m.keys[i] = d.keys[i]; + } + } + return m; + }; + + /** + * Creates a plain object from an AuthOpts message. Also converts values to other types if specified. + * @function toObject + * @memberof TopicDescriptor.AuthOpts + * @static + * @param {TopicDescriptor.AuthOpts} m AuthOpts + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + AuthOpts.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.arrays || o.defaults) { + d.keys = []; + } + if (o.defaults) { + d.mode = o.enums === String ? "NONE" : 0; + } + if (m.mode != null && m.hasOwnProperty("mode")) { + d.mode = o.enums === String ? $root.TopicDescriptor.AuthOpts.AuthMode[m.mode] : m.mode; + } + if (m.keys && m.keys.length) { + d.keys = []; + for (var j = 0; j < m.keys.length; ++j) { + d.keys[j] = o.bytes === String ? $util.base64.encode(m.keys[j], 0, m.keys[j].length) : o.bytes === Array ? Array.prototype.slice.call(m.keys[j]) : m.keys[j]; + } + } + return d; + }; + + /** + * Converts this AuthOpts to JSON. + * @function toJSON + * @memberof TopicDescriptor.AuthOpts + * @instance + * @returns {Object.} JSON object + */ + AuthOpts.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * AuthMode enum. + * @name TopicDescriptor.AuthOpts.AuthMode + * @enum {number} + * @property {number} NONE=0 NONE value + * @property {number} KEY=1 KEY value + * @property {number} WOT=2 WOT value + */ + AuthOpts.AuthMode = (function() { + var valuesById = {}, values = Object.create(valuesById); + values[valuesById[0] = "NONE"] = 0; + values[valuesById[1] = "KEY"] = 1; + values[valuesById[2] = "WOT"] = 2; + return values; + })(); + + return AuthOpts; + })(); + + TopicDescriptor.EncOpts = (function() { + + /** + * Properties of an EncOpts. + * @memberof TopicDescriptor + * @interface IEncOpts + * @property {TopicDescriptor.EncOpts.EncMode|null} [mode] EncOpts mode + * @property {Array.|null} [keyHashes] EncOpts keyHashes + */ + + /** + * Constructs a new EncOpts. + * @memberof TopicDescriptor + * @classdesc Represents an EncOpts. + * @implements IEncOpts + * @constructor + * @param {TopicDescriptor.IEncOpts=} [p] Properties to set + */ + function EncOpts(p) { + this.keyHashes = []; + if (p) + for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) + if (p[ks[i]] != null) + this[ks[i]] = p[ks[i]]; + } + + /** + * EncOpts mode. + * @member {TopicDescriptor.EncOpts.EncMode} mode + * @memberof TopicDescriptor.EncOpts + * @instance + */ + EncOpts.prototype.mode = 0; + + /** + * EncOpts keyHashes. + * @member {Array.} keyHashes + * @memberof TopicDescriptor.EncOpts + * @instance + */ + EncOpts.prototype.keyHashes = $util.emptyArray; + + /** + * Encodes the specified EncOpts message. Does not implicitly {@link TopicDescriptor.EncOpts.verify|verify} messages. + * @function encode + * @memberof TopicDescriptor.EncOpts + * @static + * @param {TopicDescriptor.IEncOpts} m EncOpts message or plain object to encode + * @param {$protobuf.Writer} [w] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + EncOpts.encode = function encode(m, w) { + if (!w) + w = $Writer.create(); + if (m.mode != null && Object.hasOwnProperty.call(m, "mode")) + w.uint32(8).int32(m.mode); + if (m.keyHashes != null && m.keyHashes.length) { + for (var i = 0; i < m.keyHashes.length; ++i) + w.uint32(18).bytes(m.keyHashes[i]); + } + return w; + }; + + /** + * Decodes an EncOpts message from the specified reader or buffer. + * @function decode + * @memberof TopicDescriptor.EncOpts + * @static + * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from + * @param {number} [l] Message length if known beforehand + * @returns {TopicDescriptor.EncOpts} EncOpts + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + EncOpts.decode = function decode(r, l) { + if (!(r instanceof $Reader)) + r = $Reader.create(r); + var c = l === undefined ? r.len : r.pos + l, m = new $root.TopicDescriptor.EncOpts(); + while (r.pos < c) { + var t = r.uint32(); + switch (t >>> 3) { + case 1: + m.mode = r.int32(); + break; + case 2: + if (!(m.keyHashes && m.keyHashes.length)) + m.keyHashes = []; + m.keyHashes.push(r.bytes()); + break; + default: + r.skipType(t & 7); + break; + } + } + return m; + }; + + /** + * Creates an EncOpts message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof TopicDescriptor.EncOpts + * @static + * @param {Object.} d Plain object + * @returns {TopicDescriptor.EncOpts} EncOpts + */ + EncOpts.fromObject = function fromObject(d) { + if (d instanceof $root.TopicDescriptor.EncOpts) + return d; + var m = new $root.TopicDescriptor.EncOpts(); + switch (d.mode) { + case "NONE": + case 0: + m.mode = 0; + break; + case "SHAREDKEY": + case 1: + m.mode = 1; + break; + case "WOT": + case 2: + m.mode = 2; + break; + } + if (d.keyHashes) { + if (!Array.isArray(d.keyHashes)) + throw TypeError(".TopicDescriptor.EncOpts.keyHashes: array expected"); + m.keyHashes = []; + for (var i = 0; i < d.keyHashes.length; ++i) { + if (typeof d.keyHashes[i] === "string") + $util.base64.decode(d.keyHashes[i], m.keyHashes[i] = $util.newBuffer($util.base64.length(d.keyHashes[i])), 0); + else if (d.keyHashes[i].length) + m.keyHashes[i] = d.keyHashes[i]; + } + } + return m; + }; + + /** + * Creates a plain object from an EncOpts message. Also converts values to other types if specified. + * @function toObject + * @memberof TopicDescriptor.EncOpts + * @static + * @param {TopicDescriptor.EncOpts} m EncOpts + * @param {$protobuf.IConversionOptions} [o] Conversion options + * @returns {Object.} Plain object + */ + EncOpts.toObject = function toObject(m, o) { + if (!o) + o = {}; + var d = {}; + if (o.arrays || o.defaults) { + d.keyHashes = []; + } + if (o.defaults) { + d.mode = o.enums === String ? "NONE" : 0; + } + if (m.mode != null && m.hasOwnProperty("mode")) { + d.mode = o.enums === String ? $root.TopicDescriptor.EncOpts.EncMode[m.mode] : m.mode; + } + if (m.keyHashes && m.keyHashes.length) { + d.keyHashes = []; + for (var j = 0; j < m.keyHashes.length; ++j) { + d.keyHashes[j] = o.bytes === String ? $util.base64.encode(m.keyHashes[j], 0, m.keyHashes[j].length) : o.bytes === Array ? Array.prototype.slice.call(m.keyHashes[j]) : m.keyHashes[j]; + } + } + return d; + }; + + /** + * Converts this EncOpts to JSON. + * @function toJSON + * @memberof TopicDescriptor.EncOpts + * @instance + * @returns {Object.} JSON object + */ + EncOpts.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * EncMode enum. + * @name TopicDescriptor.EncOpts.EncMode + * @enum {number} + * @property {number} NONE=0 NONE value + * @property {number} SHAREDKEY=1 SHAREDKEY value + * @property {number} WOT=2 WOT value + */ + EncOpts.EncMode = (function() { + var valuesById = {}, values = Object.create(valuesById); + values[valuesById[0] = "NONE"] = 0; + values[valuesById[1] = "SHAREDKEY"] = 1; + values[valuesById[2] = "WOT"] = 2; + return values; + })(); + + return EncOpts; + })(); + + return TopicDescriptor; +})(); + +module.exports = $root; diff --git a/src/pubsub/message/topic-descriptor.proto.js b/src/pubsub/message/topic-descriptor.proto similarity index 93% rename from src/pubsub/message/topic-descriptor.proto.js rename to src/pubsub/message/topic-descriptor.proto index 6e829ca57..f44e6c0f8 100644 --- a/src/pubsub/message/topic-descriptor.proto.js +++ b/src/pubsub/message/topic-descriptor.proto @@ -1,10 +1,10 @@ -'use strict' -module.exports = ` +syntax = "proto3"; + // topicCID = cid(merkledag_protobuf(topicDescriptor)); (not the topic.name) message TopicDescriptor { optional string name = 1; optional AuthOpts auth = 2; - optional EncOpts enc = 2; + optional EncOpts enc = 3; message AuthOpts { optional AuthMode mode = 1; @@ -27,4 +27,4 @@ message TopicDescriptor { WOT = 2; // web of trust, certificates can allow publisher set to grow } } -}` +} \ No newline at end of file diff --git a/src/pubsub/message/types.d.ts b/src/pubsub/message/types.d.ts deleted file mode 100644 index 7e8c3630e..000000000 --- a/src/pubsub/message/types.d.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { RPC, Message, SubOpts } from './types' - -export type RPC = RPC -export type Message = Message -export type SubOpts = SubOpts diff --git a/src/pubsub/peer-streams.js b/src/pubsub/peer-streams.js index 5a5886ec4..281eab9a1 100644 --- a/src/pubsub/peer-streams.js +++ b/src/pubsub/peer-streams.js @@ -10,7 +10,6 @@ const log = Object.assign(debug('libp2p-pubsub:peer-streams'), { const EventEmitter = require('events') const lp = require('it-length-prefixed') - const pushable = require('it-pushable') const { pipe } = require('it-pipe') const { source: abortable } = require('abortable-iterator') diff --git a/src/pubsub/tests/two-nodes.js b/src/pubsub/tests/two-nodes.js index 18b6b8dfe..c9fdb18bc 100644 --- a/src/pubsub/tests/two-nodes.js +++ b/src/pubsub/tests/two-nodes.js @@ -62,7 +62,9 @@ module.exports = (common) => { expect(psB.peers.size).to.equal(1) expectSet(psB.topics.get(topic), [psA.peerId.toB58String()]) expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + expect(changedSubs).to.have.lengthOf(1) + expect(changedSubs[0].topicID).to.equal(topic) + expect(changedSubs[0].subscribe).to.equal(true) defer.resolve() }) psA.subscribe(topic) @@ -144,7 +146,9 @@ module.exports = (common) => { expect(psB.peers.size).to.equal(1) expectSet(psB.topics.get(topic), []) expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + expect(changedSubs).to.have.lengthOf(1) + expect(changedSubs[0].topicID).to.equal(topic) + expect(changedSubs[0].subscribe).to.equal(false) defer.resolve() }) diff --git a/src/pubsub/utils.js b/src/pubsub/utils.js index 8dc0367cf..0cf71dfb2 100644 --- a/src/pubsub/utils.js +++ b/src/pubsub/utils.js @@ -7,6 +7,12 @@ const uint8ArrayFromString = require('uint8arrays/from-string') const PeerId = require('peer-id') const multihash = require('multihashes') +/** + * @typedef {import('./message/rpc').RPC.IMessage} IMessage + * @typedef {import('./message/rpc').RPC.Message} Message + * @typedef {import('.').InMessage} NormalizedIMessage + */ + /** * Generatea random sequence number. * @@ -94,11 +100,13 @@ const ensureArray = (maybeArray) => { * Ensures `message.from` is base58 encoded * * @template {{from?:any}} T - * @param {T & {from?:string, receivedFrom:string}} message + * @param {T & IMessage} message * @param {string} [peerId] - * @returns {T & {from?: string, peerId?: string }} + * @returns {NormalizedIMessage} */ const normalizeInRpcMessage = (message, peerId) => { + /** @type {NormalizedIMessage} */ + // @ts-ignore receivedFrom not yet defined const m = Object.assign({}, message) if (message.from instanceof Uint8Array) { m.from = uint8ArrayToString(message.from, 'base58btc') @@ -112,10 +120,12 @@ const normalizeInRpcMessage = (message, peerId) => { /** * @template {{from?:any, data?:any}} T * - * @param {T} message - * @returns {T & {from?: Uint8Array, data?: Uint8Array}} + * @param {T & NormalizedIMessage} message + * @returns {Message} */ const normalizeOutRpcMessage = (message) => { + /** @type {Message} */ + // @ts-ignore from not yet defined const m = Object.assign({}, message) if (typeof message.from === 'string') { m.from = uint8ArrayFromString(message.from, 'base58btc') diff --git a/src/stream-muxer/tests/close-test.js b/src/stream-muxer/tests/close-test.js index e70dc56cd..480ca80a9 100644 --- a/src/stream-muxer/tests/close-test.js +++ b/src/stream-muxer/tests/close-test.js @@ -7,12 +7,12 @@ const pair = require('it-pair/duplex') const { pipe } = require('it-pipe') const { consume } = require('streaming-iterables') const Tcp = require('libp2p-tcp') -const multiaddr = require('multiaddr') +const { Multiaddr } = require('multiaddr') const { source: abortable } = require('abortable-iterator') const AbortController = require('abort-controller').default const uint8arrayFromString = require('uint8arrays/from-string') -const mh = multiaddr('/ip4/127.0.0.1/tcp/0') +const mh = new Multiaddr('/ip4/127.0.0.1/tcp/0') function pause (ms) { return new Promise(resolve => setTimeout(resolve, ms)) diff --git a/test/connection/compliance.spec.js b/test/connection/compliance.spec.js index e163b874c..350b4fe74 100644 --- a/test/connection/compliance.spec.js +++ b/test/connection/compliance.spec.js @@ -5,7 +5,7 @@ const tests = require('../../src/connection/tests') const { Connection } = require('../../src/connection') const peers = require('../../src/utils/peers') const PeerId = require('peer-id') -const multiaddr = require('multiaddr') +const { Multiaddr } = require('multiaddr') const pair = require('it-pair') describe('compliance tests', () => { @@ -17,8 +17,8 @@ describe('compliance tests', () => { * @param {*} properties */ async setup (properties) { - const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') - const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') + const localAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8080') + const remoteAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8081') const [localPeer, remotePeer] = await Promise.all([ PeerId.createFromJSON(peers[0]), PeerId.createFromJSON(peers[1]) diff --git a/test/peer-discovery/mock-discovery.js b/test/peer-discovery/mock-discovery.js index a9f0ca28c..12d8d3cbe 100644 --- a/test/peer-discovery/mock-discovery.js +++ b/test/peer-discovery/mock-discovery.js @@ -2,7 +2,7 @@ const { EventEmitter } = require('events') -const multiaddr = require('multiaddr') +const { Multiaddr } = require('multiaddr') const PeerId = require('peer-id') /** @@ -41,7 +41,7 @@ class MockDiscovery extends EventEmitter { this._timer = setTimeout(() => { this.emit('peer', { id: peerId, - multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/8000')] + multiaddrs: [new Multiaddr('/ip4/127.0.0.1/tcp/8000')] }) }, this.options.discoveryDelay || 1000) } diff --git a/test/pubsub/sign.spec.js b/test/pubsub/sign.spec.js index f74393380..14726efa0 100644 --- a/test/pubsub/sign.spec.js +++ b/test/pubsub/sign.spec.js @@ -6,7 +6,7 @@ const { expect } = require('aegir/utils/chai') const uint8ArrayConcat = require('uint8arrays/concat') const uint8ArrayFromString = require('uint8arrays/from-string') -const { Message } = require('../../src/pubsub/message') +const { RPC } = require('../../src/pubsub/message/rpc') const { signMessage, SignPrefix, @@ -31,7 +31,7 @@ describe('message signing', () => { topicIDs: ['test-topic'] } - const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(message).finish()]) const expectedSignature = await peerId.privKey.sign(bytesToSign) const signedMessage = await signMessage(peerId, message) @@ -55,7 +55,7 @@ describe('message signing', () => { topicIDs: ['test-topic'] } - const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(message).finish()]) const expectedSignature = await secPeerId.privKey.sign(bytesToSign) const signedMessage = await signMessage(secPeerId, message) @@ -77,7 +77,7 @@ describe('message signing', () => { topicIDs: ['test-topic'] } - const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(message).finish()]) const expectedSignature = await peerId.privKey.sign(bytesToSign) const signedMessage = await signMessage(peerId, message) diff --git a/test/pubsub/utils/index.js b/test/pubsub/utils/index.js index f62f1cba8..4d037c488 100644 --- a/test/pubsub/utils/index.js +++ b/test/pubsub/utils/index.js @@ -5,7 +5,7 @@ const DuplexPair = require('it-pair/duplex') const PeerId = require('peer-id') const PubsubBaseProtocol = require('../../../src/pubsub') -const { message } = require('../../../src/pubsub') +const { RPC } = require('../../../src/pubsub/message/rpc') exports.createPeerId = async () => { const peerId = await PeerId.create({ bits: 1024 }) @@ -28,11 +28,11 @@ class PubsubImplementation extends PubsubBaseProtocol { } _decodeRpc (bytes) { - return message.rpc.RPC.decode(bytes) + return RPC.decode(bytes) } _encodeRpc (rpc) { - return message.rpc.RPC.encode(rpc) + return RPC.encode(rpc).finish() } } diff --git a/tsconfig.json b/tsconfig.json index 0cbdc31a4..134309e63 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,5 +5,9 @@ }, "include": [ "src" + ], + "exclude": [ + "src/pubsub/message/rpc.js", // exclude generated file + "src/pubsub/message/topic-descriptor.js" // exclude generated file ] }