From 3bcd36cd1fd4cad7cbcf6ab608695601ece96fa4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 12 Aug 2021 16:21:23 +0100 Subject: [PATCH 1/4] feat: pubsub over gRPC Browsers can only have six concurrently open connections to a host name. Pubsub works over HTTP by holding a connection open per subscription, which means you can only subscribe six times before things start to hang. gRPC runs over websockets so doesn't have this limitation. This PR adds pubsub support to the gRPC server and `ipfs-client` module so you can subscribe to lots and lots of channels concurrently, working around the browser connection limitation. Refs: #3741 --- packages/interface-ipfs-core/package.json | 1 + .../interface-ipfs-core/src/pubsub/peers.js | 6 +- .../src/pubsub/subscribe.js | 141 +++++++++++++++++- packages/ipfs-grpc-client/package.json | 1 + .../src/core-api/pubsub/subscribe.js | 58 +++++++ .../src/core-api/pubsub/subscriptions.js | 10 ++ .../src/core-api/pubsub/unsubscribe.js | 56 +++++++ packages/ipfs-grpc-client/src/index.js | 6 + packages/ipfs-grpc-protocol/src/pubsub.proto | 31 ++++ packages/ipfs-grpc-server/package.json | 1 + .../src/endpoints/pubsub/subscribe.js | 42 ++++++ .../src/endpoints/pubsub/subscriptions.js | 12 ++ .../src/endpoints/pubsub/unsubscribe.js | 44 ++++++ packages/ipfs-grpc-server/src/index.js | 9 +- packages/ipfs/test/interface-client.js | 5 + 15 files changed, 415 insertions(+), 8 deletions(-) create mode 100644 packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js create mode 100644 packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js create mode 100644 packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js create mode 100644 packages/ipfs-grpc-protocol/src/pubsub.proto create mode 100644 packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js create mode 100644 packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js create mode 100644 packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js diff --git a/packages/interface-ipfs-core/package.json b/packages/interface-ipfs-core/package.json index 9fe7f81086..2fabaa5842 100644 --- a/packages/interface-ipfs-core/package.json +++ b/packages/interface-ipfs-core/package.json @@ -77,6 +77,7 @@ "pako": "^1.0.2", "peer-id": "^0.15.1", "readable-stream": "^3.4.0", + "sinon": "^11.1.1", "uint8arrays": "^2.1.6" }, "contributors": [ diff --git a/packages/interface-ipfs-core/src/pubsub/peers.js b/packages/interface-ipfs-core/src/pubsub/peers.js index 88b065922b..0be7c60bae 100644 --- a/packages/interface-ipfs-core/src/pubsub/peers.js +++ b/packages/interface-ipfs-core/src/pubsub/peers.js @@ -37,10 +37,10 @@ module.exports = (factory, options) => { let ipfs3Id before(async () => { - ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api + ipfs1 = (await factory.spawn({ ipfsOptions })).api // webworkers are not dialable because webrtc is not available - ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api - ipfs3 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api + ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api + ipfs3 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api ipfs2Id = await ipfs2.id() ipfs3Id = await ipfs3.id() diff --git a/packages/interface-ipfs-core/src/pubsub/subscribe.js b/packages/interface-ipfs-core/src/pubsub/subscribe.js index 4f883c5026..25f1598772 100644 --- a/packages/interface-ipfs-core/src/pubsub/subscribe.js +++ b/packages/interface-ipfs-core/src/pubsub/subscribe.js @@ -13,6 +13,7 @@ const { AbortController } = require('native-abort-controller') const { isWebWorker, isNode } = require('ipfs-utils/src/env') const getIpfsOptions = require('../utils/ipfs-options-websockets-filter-all') const first = require('it-first') +const sinon = require('sinon') /** * @typedef {import('ipfsd-ctl').Factory} Factory @@ -44,12 +45,10 @@ module.exports = (factory, options) => { let ipfs2Id before(async () => { - ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api - // TODO 'multiple connected nodes' tests fails with go in Firefox - // and JS is flaky everywhere + ipfs1 = (await factory.spawn({ ipfsOptions })).api // webworkers are not dialable because webrtc is not available - ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api + ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api ipfs1Id = await ipfs1.id() ipfs2Id = await ipfs2.id() @@ -84,6 +83,7 @@ module.exports = (factory, options) => { await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hi')) const msg = await first(msgStream) + expect(uint8ArrayToString(msg.data)).to.equal('hi') expect(msg).to.have.property('seqno') expect(msg.seqno).to.be.an.instanceof(Uint8Array) @@ -410,6 +410,139 @@ module.exports = (factory, options) => { expect(uint8ArrayToString(msg.data).startsWith(msgBase)).to.be.true() }) }) + + it('should receive messages from a different node on lots of topics', async () => { + // @ts-ignore this is mocha + this.timeout(5 * 60 * 1000) + + const numTopics = 20 + const topics = [] + const expectedStrings = [] + const msgStreams = [] + + for (let i = 0; i < numTopics; i++) { + const topic = `pubsub-topic-${Math.random()}` + topics.push(topic) + + const msgStream1 = pushable() + const msgStream2 = pushable() + + msgStreams.push({ + msgStream1, + msgStream2 + }) + + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + const sub1 = msg => { + msgStream1.push(msg) + msgStream1.end() + } + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + const sub2 = msg => { + msgStream2.push(msg) + msgStream2.end() + } + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub1), + ipfs2.pubsub.subscribe(topic, sub2) + ]) + + await waitForPeers(ipfs2, topic, [ipfs1Id.id], 30000) + } + + await delay(5000) // gossipsub needs this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331 + + for (let i = 0; i < numTopics; i++) { + const expectedString = `hello pubsub ${Math.random()}` + expectedStrings.push(expectedString) + + await ipfs2.pubsub.publish(topics[i], uint8ArrayFromString(expectedString)) + } + + for (let i = 0; i < numTopics; i++) { + const [sub1Msg] = await all(msgStreams[i].msgStream1) + expect(uint8ArrayToString(sub1Msg.data)).to.equal(expectedStrings[i]) + expect(sub1Msg.from).to.eql(ipfs2Id.id) + + const [sub2Msg] = await all(msgStreams[i].msgStream2) + expect(uint8ArrayToString(sub2Msg.data)).to.equal(expectedStrings[i]) + expect(sub2Msg.from).to.eql(ipfs2Id.id) + } + }) + + it('should unsubscribe multiple handlers', async () => { + // @ts-ignore this is mocha + this.timeout(2 * 60 * 1000) + + const topic = `topic-${Math.random()}` + + const handler1 = sinon.stub() + const handler2 = sinon.stub() + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sinon.stub()), + ipfs2.pubsub.subscribe(topic, handler1), + ipfs2.pubsub.subscribe(topic, handler2) + ]) + + await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000) + + expect(handler1).to.have.property('callCount', 0) + expect(handler2).to.have.property('callCount', 0) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + + await ipfs2.pubsub.unsubscribe(topic) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + }) + + it('should unsubscribe individual handlers', async () => { + // @ts-ignore this is mocha + this.timeout(2 * 60 * 1000) + + const topic = `topic-${Math.random()}` + + const handler1 = sinon.stub() + const handler2 = sinon.stub() + + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sinon.stub()), + ipfs2.pubsub.subscribe(topic, handler1), + ipfs2.pubsub.subscribe(topic, handler2) + ]) + + await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000) + + expect(handler1).to.have.property('callCount', 0) + expect(handler2).to.have.property('callCount', 0) + + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 1) + + await ipfs2.pubsub.unsubscribe(topic, handler1) + await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2')) + + await delay(1000) + + expect(handler1).to.have.property('callCount', 1) + expect(handler2).to.have.property('callCount', 2) + }) }) }) } diff --git a/packages/ipfs-grpc-client/package.json b/packages/ipfs-grpc-client/package.json index 81d45fd149..72b533e379 100644 --- a/packages/ipfs-grpc-client/package.json +++ b/packages/ipfs-grpc-client/package.json @@ -44,6 +44,7 @@ "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", "multiformats": "^9.4.1", + "p-defer": "^3.0.0", "protobufjs": "^6.10.2", "wherearewe": "1.0.0", "ws": "^7.3.1" diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js b/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js new file mode 100644 index 0000000000..e05af2ec30 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js @@ -0,0 +1,58 @@ +'use strict' + +const serverStreamToIterator = require('../../utils/server-stream-to-iterator') +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const subscriptions = require('./subscriptions') +const defer = require('p-defer') + +/** + * @param {import('@improbable-eng/grpc-web').grpc} grpc + * @param {*} service + * @param {import('../../types').Options} opts + */ +module.exports = function grpcPubsubSubscribe (grpc, service, opts) { + /** + * @type {import('ipfs-core-types/src/pubsub').API["subscribe"]} + */ + async function pubsubSubscribe (topic, handler, options = {}) { + const request = { + topic + } + + const deferred = defer() + + Promise.resolve().then(async () => { + try { + for await (const result of serverStreamToIterator(grpc, service, request, { + host: opts.url, + debug: Boolean(process.env.DEBUG), + metadata: options, + agent: opts.agent + })) { + if (result.handler) { + const subs = subscriptions.get(topic) || new Map() + subs.set(result.handler, handler) + subscriptions.set(topic, subs) + + deferred.resolve() + } else { + handler({ + from: result.from, + seqno: result.seqno, + data: result.data, + topicIDs: result.topicIDs + }) + } + } + } catch (err) { + if (options && options.onError) { + options.onError(err) + } + } + }) + + await deferred.promise + } + + return withTimeoutOption(pubsubSubscribe) +} diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js b/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js new file mode 100644 index 0000000000..f354d79397 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js @@ -0,0 +1,10 @@ +'use strict' + +/** + * @typedef {import('ipfs-core-types/src/pubsub').MessageHandlerFn} Subscription + */ + +/** @type {Map>} */ +const subs = new Map() + +module.exports = subs diff --git a/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js b/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js new file mode 100644 index 0000000000..c45f1c3fc9 --- /dev/null +++ b/packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js @@ -0,0 +1,56 @@ +'use strict' + +const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option') +const toHeaders = require('../../utils/to-headers') +const unaryToPromise = require('../../utils/unary-to-promise') +const subscriptions = require('./subscriptions') + +/** + * @param {import('@improbable-eng/grpc-web').grpc} grpc + * @param {*} service + * @param {import('../../types').Options} opts + */ +module.exports = function grpcPubsubUnsubscribe (grpc, service, opts) { + /** + * @type {import('ipfs-core-types/src/pubsub').API["unsubscribe"]} + */ + async function pubsubUnsubscribe (topic, handler, options = {}) { + const handlers = [] + const subs = subscriptions.get(topic) + + if (!subs) { + return + } + + if (handler) { + for (const [key, value] of subs.entries()) { + if (value === handler) { + handlers.push(key) + } + } + } else { + + } + + const request = { + topic, + handlers + } + + await unaryToPromise(grpc, service, request, { + host: opts.url, + metadata: toHeaders(options), + agent: opts.agent + }) + + for (const handlerId of handlers) { + subs.delete(handlerId) + } + + if (!subs.size) { + subscriptions.delete(topic) + } + } + + return withTimeoutOption(pubsubUnsubscribe) +} diff --git a/packages/ipfs-grpc-client/src/index.js b/packages/ipfs-grpc-client/src/index.js index f35b76989d..7b10546e71 100644 --- a/packages/ipfs-grpc-client/src/index.js +++ b/packages/ipfs-grpc-client/src/index.js @@ -49,6 +49,12 @@ function create (opts = { url: '' }) { ls: require('./core-api/files/ls')(grpc, service.MFS.ls, options), // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 write: require('./core-api/files/write')(grpc, service.MFS.write, options) + }, + pubsub: { + // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 + subscribe: require('./core-api/pubsub/subscribe')(grpc, service.PubSub.subscribe, options), + // @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594 + unsubscribe: require('./core-api/pubsub/unsubscribe')(grpc, service.PubSub.unsubscribe, options) } } diff --git a/packages/ipfs-grpc-protocol/src/pubsub.proto b/packages/ipfs-grpc-protocol/src/pubsub.proto new file mode 100644 index 0000000000..fd717c1c2a --- /dev/null +++ b/packages/ipfs-grpc-protocol/src/pubsub.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +import "common.proto"; + +package ipfs; + +service PubSub { + rpc subscribe (SubscribeRequest) returns (stream SubscribeResponse) {} + rpc unsubscribe (UnSubscribeRequest) returns (UnSubscribeResponse) {} +} + +message SubscribeRequest { + string topic = 1; +} + +message SubscribeResponse { + string handler = 1; + string from = 2; + bytes seqno = 3; + bytes data = 4; + repeated string topicIDs = 5; +} + +message UnSubscribeRequest { + string topic = 1; + repeated string handlers = 2; +} + +message UnSubscribeResponse { + +} diff --git a/packages/ipfs-grpc-server/package.json b/packages/ipfs-grpc-server/package.json index 08b2b5ae6e..9bad04f233 100644 --- a/packages/ipfs-grpc-server/package.json +++ b/packages/ipfs-grpc-server/package.json @@ -42,6 +42,7 @@ "it-pipe": "^1.1.0", "it-pushable": "^1.4.2", "multiaddr": "^10.0.0", + "nanoid": "3.1.23", "protobufjs": "^6.10.2", "ws": "^7.3.1" }, diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js new file mode 100644 index 0000000000..e6592ed35d --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscribe.js @@ -0,0 +1,42 @@ +'use strict' + +const subscriptions = require('./subscriptions') +const { nanoid } = require('nanoid') + +/** + * @param {import('ipfs-core-types').IPFS} ipfs + * @param {import('../../types').Options} options + */ +module.exports = function grpcPubsubSubscribe (ipfs, options = {}) { + /** + * TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594 + * + * @type {import('../../types').ServerStreamingEndpoint} + */ + async function pubsubSubscribe (request, sink, metadata) { + const opts = { + ...metadata + } + + const handlerId = nanoid() + const handler = { + /** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */ + onMessage: (message) => { + sink.push(message) + }, + onUnsubscribe: () => { + sink.end() + } + } + + subscriptions.set(handlerId, handler) + + sink.push({ + handler: handlerId + }) + + await ipfs.pubsub.subscribe(request.topic, handler.onMessage, opts) + } + + return pubsubSubscribe +} diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js new file mode 100644 index 0000000000..e04e77bb4b --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/subscriptions.js @@ -0,0 +1,12 @@ +'use strict' + +/** + * @typedef {object} Subscription + * @property {import('ipfs-core-types/src/pubsub').MessageHandlerFn} onMessage + * @property {() => void} onUnsubscribe + */ + +/** @type {Map} */ +const subs = new Map() + +module.exports = subs diff --git a/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js b/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js new file mode 100644 index 0000000000..99f4228d85 --- /dev/null +++ b/packages/ipfs-grpc-server/src/endpoints/pubsub/unsubscribe.js @@ -0,0 +1,44 @@ +'use strict' + +const subscriptions = require('./subscriptions') +const { callbackify } = require('util') + +/** + * @param {import('ipfs-core-types').IPFS} ipfs + * @param {import('../../types').Options} options + */ +module.exports = function grpcPubsubUnsubscribe (ipfs, options = {}) { + /** + * TODO: Fill out input/output types after https://github.com/ipfs/js-ipfs/issues/3594 + * + * @type {import('../../types').UnaryEndpoint} + */ + async function pubsubUnsubscribe (request, metadata) { + const opts = { + ...metadata + } + + if (!request.handlers || !request.handlers.length) { + await ipfs.pubsub.unsubscribe(request.topic, undefined, opts) + + return {} + } + + for (const handlerId of request.handlers) { + const handler = subscriptions.get(handlerId) + + if (!handler) { + continue + } + + await ipfs.pubsub.unsubscribe(request.topic, handler.onMessage, opts) + + handler.onUnsubscribe() + subscriptions.delete(handlerId) + } + + return {} + } + + return callbackify(pubsubUnsubscribe) +} diff --git a/packages/ipfs-grpc-server/src/index.js b/packages/ipfs-grpc-server/src/index.js index 05b64821e7..60adb8c24f 100644 --- a/packages/ipfs-grpc-server/src/index.js +++ b/packages/ipfs-grpc-server/src/index.js @@ -8,7 +8,8 @@ const loadServices = require('./utils/load-services') const { Root, - MFS + MFS, + PubSub } = loadServices() /** @@ -31,6 +32,12 @@ module.exports = async function createServer (ipfs, options = {}) { // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 write: require('./endpoints/mfs/write')(ipfs, options) }) + server.addService(PubSub, { + // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 + subscribe: require('./endpoints/pubsub/subscribe')(ipfs, options), + // @ts-ignore - types differ because we only invoke via websockets - https://github.com/ipfs/js-ipfs/issues/3594 + unsubscribe: require('./endpoints/pubsub/unsubscribe')(ipfs, options) + }) const socket = options.socket || await webSocketServer(ipfs, options) diff --git a/packages/ipfs/test/interface-client.js b/packages/ipfs/test/interface-client.js index 63e69d6b8c..9b33b57a59 100644 --- a/packages/ipfs/test/interface-client.js +++ b/packages/ipfs/test/interface-client.js @@ -146,4 +146,9 @@ describe('interface-ipfs-core ipfs-client tests', () => { } ] }) + + tests.pubsub(factory({ + type: 'js', + ipfsClientModule: require('ipfs-client') + })) }) From bb455412af0267c1c7ba9cc30aec892a16371697 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 12 Aug 2021 16:56:02 +0100 Subject: [PATCH 2/4] chore: skip test with many connections over http --- packages/ipfs/test/interface-http-go.js | 9 +++++++-- packages/ipfs/test/interface-http-js.js | 7 ++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/ipfs/test/interface-http-go.js b/packages/ipfs/test/interface-http-go.js index 1e3a4501b7..0b414063c7 100644 --- a/packages/ipfs/test/interface-http-go.js +++ b/packages/ipfs/test/interface-http-go.js @@ -617,7 +617,11 @@ describe('interface-ipfs-core over ipfs-http-client tests against go-ipfs', () = args: ['--enable-pubsub-experiment'] } }), { - skip: isWindows + skip: [{ + name: 'should receive messages from a different node on lots of topics', + reason: 'HTTP clients cannot hold this many connections open' + }].concat( + isWindows ? [{ name: 'should send/receive 100 messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' @@ -626,7 +630,8 @@ describe('interface-ipfs-core over ipfs-http-client tests against go-ipfs', () = name: 'should receive multiple messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' }] - : null + : [] + ) }) tests.repo(commonFactory) diff --git a/packages/ipfs/test/interface-http-js.js b/packages/ipfs/test/interface-http-js.js index ce00df00ee..5d981542ea 100644 --- a/packages/ipfs/test/interface-http-js.js +++ b/packages/ipfs/test/interface-http-js.js @@ -164,7 +164,12 @@ describe('interface-ipfs-core over ipfs-http-client tests against js-ipfs', func go: { args: ['--enable-pubsub-experiment'] } - })) + }), { + skip: [{ + name: 'should receive messages from a different node on lots of topics', + reason: 'HTTP clients cannot hold this many connections open' + }] + }) tests.repo(commonFactory) From c02287880f6d2dbd6edd453287fd79ea9994b2f7 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 12 Aug 2021 17:18:28 +0100 Subject: [PATCH 3/4] chore: fix linting --- packages/ipfs/test/interface-http-go.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/ipfs/test/interface-http-go.js b/packages/ipfs/test/interface-http-go.js index 0b414063c7..4909cc20c4 100644 --- a/packages/ipfs/test/interface-http-go.js +++ b/packages/ipfs/test/interface-http-go.js @@ -622,15 +622,15 @@ describe('interface-ipfs-core over ipfs-http-client tests against go-ipfs', () = reason: 'HTTP clients cannot hold this many connections open' }].concat( isWindows - ? [{ - name: 'should send/receive 100 messages', - reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - }, - { - name: 'should receive multiple messages', - reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - }] - : [] + ? [{ + name: 'should send/receive 100 messages', + reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' + }, + { + name: 'should receive multiple messages', + reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' + }] + : [] ) }) From df04d7e19bae7e9683259bc110368fde1b1a1c51 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 12 Aug 2021 17:35:13 +0100 Subject: [PATCH 4/4] chore: fix up websocket tests --- packages/ipfs/test/interface-core.js | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/ipfs/test/interface-core.js b/packages/ipfs/test/interface-core.js index 6e220dcd80..1fe1772dd4 100644 --- a/packages/ipfs/test/interface-core.js +++ b/packages/ipfs/test/interface-core.js @@ -8,7 +8,9 @@ const factory = require('./utils/factory') /** @typedef { import("ipfsd-ctl").ControllerOptions } ControllerOptions */ describe('interface-ipfs-core tests', function () { - const commonFactory = factory() + const commonFactory = factory({ + ipfsClientModule: require('ipfs-client') + }) tests.root(commonFactory, { skip: isNode @@ -81,11 +83,7 @@ describe('interface-ipfs-core tests', function () { tests.ping(commonFactory) - tests.pubsub(factory({}, { - go: { - args: ['--enable-pubsub-experiment'] - } - }), { + tests.pubsub(commonFactory, { skip: [ ...(isNode ? []