diff --git a/packages/interface-ipfs-core/package.json b/packages/interface-ipfs-core/package.json index 9fe7f81086..2fabaa5842 100644 --- a/packages/interface-ipfs-core/package.json +++ b/packages/interface-ipfs-core/package.json @@ -77,6 +77,7 @@ "pako": "^1.0.2", "peer-id": "^0.15.1", "readable-stream": "^3.4.0", + "sinon": "^11.1.1", "uint8arrays": "^2.1.6" }, "contributors": [ diff --git a/packages/interface-ipfs-core/src/pubsub/peers.js b/packages/interface-ipfs-core/src/pubsub/peers.js index 88b065922b..0be7c60bae 100644 --- a/packages/interface-ipfs-core/src/pubsub/peers.js +++ b/packages/interface-ipfs-core/src/pubsub/peers.js @@ -37,10 +37,10 @@ module.exports = (factory, options) => { let ipfs3Id before(async () => { - ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api + ipfs1 = (await factory.spawn({ ipfsOptions })).api // webworkers are not dialable because webrtc is not available - ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api - ipfs3 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api + ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api + ipfs3 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api ipfs2Id = await ipfs2.id() ipfs3Id = await ipfs3.id() diff --git a/packages/interface-ipfs-core/src/pubsub/subscribe.js b/packages/interface-ipfs-core/src/pubsub/subscribe.js index 4f883c5026..25f1598772 100644 --- a/packages/interface-ipfs-core/src/pubsub/subscribe.js +++ b/packages/interface-ipfs-core/src/pubsub/subscribe.js @@ -13,6 +13,7 @@ const { AbortController } = require('native-abort-controller') const { isWebWorker, isNode } = require('ipfs-utils/src/env') const getIpfsOptions = require('../utils/ipfs-options-websockets-filter-all') const first = require('it-first') +const sinon = require('sinon') /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -44,12 +45,10 @@ module.exports = (factory, options) => { let ipfs2Id before(async () => { - ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api - // TODO 'multiple connected nodes' tests fails with go in Firefox - // and JS is flaky everywhere + ipfs1 = (await factory.spawn({ ipfsOptions })).api // webworkers are not dialable because webrtc is not available - ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api + ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api ipfs1Id = await ipfs1.id() ipfs2Id = await ipfs2.id() @@ -84,6 +83,7 @@ module.exports = (factory, options) => { await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hi')) const msg = await first(msgStream) + expect(uint8ArrayToString(msg.data)).to.equal('hi') expect(msg).to.have.property('seqno') expect(msg.seqno).to.be.an.instanceof(Uint8Array) @@ -410,6 +410,139 @@ module.exports = (factory, options) => { expect(uint8ArrayToString(msg.data).startsWith(msgBase)).to.be.true() }) }) + + it('should receive messages from a different node on lots of topics', async () => { + // @ts-ignore this is mocha + this.timeout(5 * 60 * 1000) + + const numTopics = 20 + const topics = [] + const expectedStrings = [] + const msgStreams = [] + + for (let i = 0; i < numTopics; i++) { + const topic = `pubsub-topic-${Math.random()}` + topics.push(topic) + + const msgStream1 = pushable() + const msgStream2 = pushable() + + msgStreams.push({ + msgStream1, + msgStream2 + }) + + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + const sub1 = msg => { + msgStream1.push(msg) + msgStream1.end() + } + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + const sub2 = msg => { + msgStream2.push(msg) + msgStream2.end() + } + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub1), + ipfs2.pubsub.subscribe(topic, sub2) + ]) + + await waitForPeers(ipfs2, topic, [ipfs1Id.id], 30000) + } + + await delay(5000) // gossipsub needs this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331 + + for (let i = 0; i < numTopics; i++) { + const expectedString = `hello pubsub ${Math.random()}` + expectedStrings.push(expectedString) + + await ipfs2.pubsub.publish(topics[i], uint8ArrayFromString(expectedString)) + } + + for (let i = 0; i < numTopics; i++) { + const [sub1Msg] = await all(msgStreams[i].msgStream1) + expect(uint8ArrayToString(sub1Msg.data)).to.equal(expectedStrings[i]) + expect(sub1Msg.from).to.eql(ipfs2Id.id) + + const [sub2Msg] = await all(msgStreams[i].msgStream2) + expect(uint8ArrayToString(sub2Msg.data)).to.equal(expectedStrings[i]) + expect(sub2Msg.from).to.eql(ipfs2Id.id) + } + }) + + it('should unsubscribe multiple handlers', async () => { + // @ts-ignore this is mocha + this.timeout(2 * 60 * 1000) + + const topic = `topic-${Math.random()}` + + const handler1 = sinon.stub() + const handler2 = sinon.stub() + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sinon.stub()), + ipfs2.pubsub.subscribe(topic, handler1), + ipfs2.pubsub.subscribe(topic, handler2) + ]) + + await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000) + + expect(handler1).to.have.property('callCount', 0) + expect(handler2).to.have.property('callCount', 0) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + + await ipfs2.pubsub.unsubscribe(topic) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + }) + + it('should unsubscribe individual handlers', async () => { + // @ts-ignore this is mocha + this.timeout(2 * 60 * 1000) + + const topic = `topic-${Math.random()}` + + const handler1 = sinon.stub() + const handler2 = sinon.stub() + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sinon.stub()), + ipfs2.pubsub.subscribe(topic, handler1), + ipfs2.pubsub.subscribe(topic, handler2) + ]) + + await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000) + + expect(handler1).to.have.property('callCount', 0) + expect(handler2).to.have.property('callCount', 0) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + + await ipfs2.pubsub.unsubscribe(topic, handler1) + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 2) + }) }) }) } diff --git a/packages/ipfs-grpc-client/package.json b/packages/ipfs-grpc-client/package.json index 81d45fd149..72b533e379 100644 --- a/packages/ipfs-grpc-client/package.json +++ b/packages/ipfs-grpc-client/package.json @@ -44,6 +44,7 @@ "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", "multiformats": "^9.4.1", + "p-defer": "^3.0.0", "protobufjs": "^6.10.2", "wherearewe": "1.0.0", "ws": "^7.3.1" diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js b/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js new file mode 100644 index 0000000000..e05af2ec30 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js @@ -0,0 +1,58 @@ +'use strict' + +const serverStreamToIterator = require('../../utils/server-stream-to-iterator') +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const subscriptions = require('./subscriptions') +const defer = require('p-defer') + +/** + * @param {import('@improbable-eng/grpc-web').grpc} grpc + * @param {*} service + * @param {import('../../types').Options} opts + */ +module.exports = function grpcPubsubSubscribe (grpc, service, opts) { + /** + * @type {import('ipfs-core-types/src/pubsub').API["subscribe"]} + */ + async function pubsubSubscribe (topic, handler, options = {}) { + const request = { + topic + } + + const deferred = defer() + + Promise.resolve().then(async () => { + try { + for await (const result of serverStreamToIterator(grpc, service, request, { + host: opts.url, + debug: Boolean(process.env.DEBUG), + metadata: options, + agent: opts.agent + })) { + if (result.handler) { + const subs = subscriptions.get(topic) || new Map() + subs.set(result.handler, handler) + subscriptions.set(topic, subs) + + deferred.resolve() + } else { + handler({ + from: result.from, + seqno: result.seqno, + data: result.data, + topicIDs: result.topicIDs + }) + } + } + } catch (err) { + if (options && options.onError) { + options.onError(err) + } + } + }) + + await deferred.promise + } + + return withTimeoutOption(pubsubSubscribe) +} diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js b/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js new file mode 100644 index 0000000000..f354d79397 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js @@ -0,0 +1,10 @@ +'use strict' + +/** + * @typedef {import('ipfs-core-types/src/pubsub').MessageHandlerFn} Subscription + */ + +/** @type {Map>} */ +const subs = new Map() + +module.exports = subs diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js b/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js new file mode 100644 index 0000000000..c45f1c3fc9 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js @@ -0,0 +1,56 @@ +'use strict' + +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const toHeaders = require('../../utils/to-headers') +const unaryToPromise = require('../../utils/unary-to-promise') +const subscriptions = require('./subscriptions') + +/** + * @param {import('@improbable-eng/grpc-web').grpc} grpc + * @param {*} service + * @param {import('../../types').Options} opts + */ +module.exports = function grpcPubsubUnsubscribe (grpc, service, opts) { + /** + * @type {import('ipfs-core-types/src/pubsub').API["unsubscribe"]} + */ + async function pubsubUnsubscribe (topic, handler, options = {}) { + const handlers = [] + const subs = subscriptions.get(topic) + + if (!subs) { + return + } + + if (handler) { + for (const [key, value] of subs.entries()) { + if (value === handler) { + handlers.push(key) + } + } + } else { + + } + + const request = { + topic, + handlers + } + + await unaryToPromise(grpc, service, request, { + host: opts.url, + metadata: toHeaders(options), + agent: opts.agent + }) + + for (const handlerId of handlers) { + subs.delete(handlerId) + } + + if (!subs.size) { + subscriptions.delete(topic) + } + } + + return withTimeoutOption(pubsubUnsubscribe) +} diff --git a/packages/ipfs-grpc-client/src/index.js b/packages/ipfs-grpc-client/src/index.js index f35b76989d..7b10546e71 100644 --- a/packages/ipfs-grpc-client/src/index.js +++ b/packages/ipfs-grpc-client/src/index.js @@ -49,6 +49,12 @@ function create (opts = { url: '' }) { ls: require('./core-api/files/ls')(grpc, service.MFS.ls, options), // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 write: require('./core-api/files/write')(grpc, service.MFS.write, options) + }, + pubsub: { + // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 + subscribe: require('./core-api/pubsub/subscribe')(grpc, service.PubSub.subscribe, options), + // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 + unsubscribe: require('./core-api/pubsub/unsubscribe')(grpc, service.PubSub.unsubscribe, options) } } diff --git a/packages/ipfs-grpc-protocol/src/pubsub.proto b/packages/ipfs-grpc-protocol/src/pubsub.proto new file mode 100644 index 0000000000..fd717c1c2a --- /dev/null +++ b/packages/ipfs-grpc-protocol/src/pubsub.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +import "common.proto"; + +package ipfs; + +service PubSub { + rpc subscribe (SubscribeRequest) returns (stream SubscribeResponse) {} + rpc unsubscribe (UnSubscribeRequest) returns (UnSubscribeResponse) {} +} + +message SubscribeRequest { + string topic = 1; +} + +message SubscribeResponse { + string handler = 1; + string from = 2; + bytes seqno = 3; + bytes data = 4; + repeated string topicIDs = 5; +} + +message UnSubscribeRequest { + string topic = 1; + repeated string handlers = 2; +} + +message UnSubscribeResponse { + +} diff --git a/packages/ipfs-grpc-server/package.json b/packages/ipfs-grpc-server/package.json index 08b2b5ae6e..9bad04f233 100644 --- a/packages/ipfs-grpc-server/package.json +++ b/packages/ipfs-grpc-server/package.json @@ -42,6 +42,7 @@ "it-pipe": "^1.1.0", "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", + "nanoid": "3.1.23", "protobufjs": "^6.10.2", "ws": "^7.3.1" }, diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js new file mode 100644 index 0000000000..e6592ed35d --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js @@ -0,0 +1,42 @@ +'use strict' + +const subscriptions = require('./subscriptions') +const { nanoid } = require('nanoid') + +/** + * @param {import('ipfs-core-types').IPFS} ipfs + * @param {import('../../types').Options} options + */ +module.exports = function grpcPubsubSubscribe (ipfs, options = {}) { + /** + * TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594 + * + * @type {import('../../types').ServerStreamingEndpoint} + */ + async function pubsubSubscribe (request, sink, metadata) { + const opts = { + ...metadata + } + + const handlerId = nanoid() + const handler = { + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + onMessage: (message) => { + sink.push(message) + }, + onUnsubscribe: () => { + sink.end() + } + } + + subscriptions.set(handlerId, handler) + + sink.push({ + handler: handlerId + }) + + await ipfs.pubsub.subscribe(request.topic, handler.onMessage, opts) + } + + return pubsubSubscribe +} diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js new file mode 100644 index 0000000000..e04e77bb4b --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js @@ -0,0 +1,12 @@ +'use strict' + +/** + * @typedef {object} Subscription + * @property {import('ipfs-core-types/src/pubsub').MessageHandlerFn} onMessage + * @property {() => void} onUnsubscribe + */ + +/** @type {Map} */ +const subs = new Map() + +module.exports = subs diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js new file mode 100644 index 0000000000..99f4228d85 --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js @@ -0,0 +1,44 @@ +'use strict' + +const subscriptions = require('./subscriptions') +const { callbackify } = require('util') + +/** + * @param {import('ipfs-core-types').IPFS} ipfs + * @param {import('../../types').Options} options + */ +module.exports = function grpcPubsubUnsubscribe (ipfs, options = {}) { + /** + * TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594 + * + * @type {import('../../types').UnaryEndpoint} + */ + async function pubsubUnsubscribe (request, metadata) { + const opts = { + ...metadata + } + + if (!request.handlers || !request.handlers.length) { + await ipfs.pubsub.unsubscribe(request.topic, undefined, opts) + + return {} + } + + for (const handlerId of request.handlers) { + const handler = subscriptions.get(handlerId) + + if (!handler) { + continue + } + + await ipfs.pubsub.unsubscribe(request.topic, handler.onMessage, opts) + + handler.onUnsubscribe() + subscriptions.delete(handlerId) + } + + return {} + } + + return callbackify(pubsubUnsubscribe) +} diff --git a/packages/ipfs-grpc-server/src/index.js b/packages/ipfs-grpc-server/src/index.js index 05b64821e7..60adb8c24f 100644 --- a/packages/ipfs-grpc-server/src/index.js +++ b/packages/ipfs-grpc-server/src/index.js @@ -8,7 +8,8 @@ const loadServices = require('./utils/load-services') const { Root, - MFS + MFS, + PubSub } = loadServices() /** @@ -31,6 +32,12 @@ module.exports = async function createServer (ipfs, options = {}) { // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 write: require('./endpoints/mfs/write')(ipfs, options) }) + server.addService(PubSub, { + // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 + subscribe: require('./endpoints/pubsub/subscribe')(ipfs, options), + // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 + unsubscribe: require('./endpoints/pubsub/unsubscribe')(ipfs, options) + }) const socket = options.socket || await webSocketServer(ipfs, options) diff --git a/packages/ipfs/test/interface-client.js b/packages/ipfs/test/interface-client.js index 63e69d6b8c..9b33b57a59 100644 --- a/packages/ipfs/test/interface-client.js +++ b/packages/ipfs/test/interface-client.js @@ -146,4 +146,9 @@ describe('interface-ipfs-core ipfs-client tests', () => { } ] }) + + tests.pubsub(factory({ + type: 'js', + ipfsClientModule: require('ipfs-client') + })) }) diff --git a/packages/ipfs/test/interface-core.js b/packages/ipfs/test/interface-core.js index 6e220dcd80..1fe1772dd4 100644 --- a/packages/ipfs/test/interface-core.js +++ b/packages/ipfs/test/interface-core.js @@ -8,7 +8,9 @@ const factory = require('./utils/factory') /** @typedef { import("ipfsd-ctl").ControllerOptions } ControllerOptions */ describe('interface-ipfs-core tests', function () { - const commonFactory = factory() + const commonFactory = factory({ + ipfsClientModule: require('ipfs-client') + }) tests.root(commonFactory, { skip: isNode @@ -81,11 +83,7 @@ describe('interface-ipfs-core tests', function () { tests.ping(commonFactory) - tests.pubsub(factory({}, { - go: { - args: ['--enable-pubsub-experiment'] - } - }), { + tests.pubsub(commonFactory, { skip: [ ...(isNode ? [] diff --git a/packages/ipfs/test/interface-http-go.js b/packages/ipfs/test/interface-http-go.js index 1e3a4501b7..4909cc20c4 100644 --- a/packages/ipfs/test/interface-http-go.js +++ b/packages/ipfs/test/interface-http-go.js @@ -617,16 +617,21 @@ describe('interface-ipfs-core over ipfs-http-client tests against go-ipfs', () = args: ['--enable-pubsub-experiment'] } }), { - skip: isWindows - ? [{ - name: 'should send/receive 100 messages', - reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - }, - { - name: 'should receive multiple messages', - reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - }] - : null + skip: [{ + name: 'should receive messages from a different node on lots of topics', + reason: 'HTTP clients cannot hold this many connections open' + }].concat( + isWindows + ? [{ + name: 'should send/receive 100 messages', + reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' + }, + { + name: 'should receive multiple messages', + reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' + }] + : [] + ) }) tests.repo(commonFactory) diff --git a/packages/ipfs/test/interface-http-js.js b/packages/ipfs/test/interface-http-js.js index ce00df00ee..5d981542ea 100644 --- a/packages/ipfs/test/interface-http-js.js +++ b/packages/ipfs/test/interface-http-js.js @@ -164,7 +164,12 @@ describe('interface-ipfs-core over ipfs-http-client tests against js-ipfs', func go: { args: ['--enable-pubsub-experiment'] } - })) + }), { + skip: [{ + name: 'should receive messages from a different node on lots of topics', + reason: 'HTTP clients cannot hold this many connections open' + }] + }) tests.repo(commonFactory)