Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

feat: pubsub over gRPC #3813

Merged
merged 4 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"pako": "^1.0.2",
"peer-id": "^0.15.1",
"readable-stream": "^3.4.0",
"sinon": "^11.1.1",
"uint8arrays": "^2.1.6"
},
"contributors": [
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-ipfs-core/src/pubsub/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ module.exports = (factory, options) => {
let ipfs3Id

before(async () => {
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
ipfs1 = (await factory.spawn({ ipfsOptions })).api
// webworkers are not dialable because webrtc is not available
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api
ipfs3 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api

ipfs2Id = await ipfs2.id()
ipfs3Id = await ipfs3.id()
Expand Down
141 changes: 137 additions & 4 deletions packages/interface-ipfs-core/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { AbortController } = require('native-abort-controller')
const { isWebWorker, isNode } = require('ipfs-utils/src/env')
const getIpfsOptions = require('../utils/ipfs-options-websockets-filter-all')
const first = require('it-first')
const sinon = require('sinon')

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand Down Expand Up @@ -44,12 +45,10 @@ module.exports = (factory, options) => {
let ipfs2Id

before(async () => {
ipfs1 = (await factory.spawn({ type: 'proc', ipfsOptions })).api
// TODO 'multiple connected nodes' tests fails with go in Firefox
// and JS is flaky everywhere
ipfs1 = (await factory.spawn({ ipfsOptions })).api

// webworkers are not dialable because webrtc is not available
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'go' : undefined })).api
ipfs2 = (await factory.spawn({ type: isWebWorker ? 'js' : undefined, ipfsOptions })).api

ipfs1Id = await ipfs1.id()
ipfs2Id = await ipfs2.id()
Expand Down Expand Up @@ -84,6 +83,7 @@ module.exports = (factory, options) => {
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hi'))

const msg = await first(msgStream)

expect(uint8ArrayToString(msg.data)).to.equal('hi')
expect(msg).to.have.property('seqno')
expect(msg.seqno).to.be.an.instanceof(Uint8Array)
Expand Down Expand Up @@ -410,6 +410,139 @@ module.exports = (factory, options) => {
expect(uint8ArrayToString(msg.data).startsWith(msgBase)).to.be.true()
})
})

it('should receive messages from a different node on lots of topics', async () => {
// @ts-ignore this is mocha
this.timeout(5 * 60 * 1000)

const numTopics = 20
const topics = []
const expectedStrings = []
const msgStreams = []

for (let i = 0; i < numTopics; i++) {
const topic = `pubsub-topic-${Math.random()}`
topics.push(topic)

const msgStream1 = pushable()
const msgStream2 = pushable()

msgStreams.push({
msgStream1,
msgStream2
})

/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
const sub1 = msg => {
msgStream1.push(msg)
msgStream1.end()
}
/** @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} */
const sub2 = msg => {
msgStream2.push(msg)
msgStream2.end()
}

await Promise.all([
ipfs1.pubsub.subscribe(topic, sub1),
ipfs2.pubsub.subscribe(topic, sub2)
])

await waitForPeers(ipfs2, topic, [ipfs1Id.id], 30000)
}

await delay(5000) // gossipsub needs this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331

for (let i = 0; i < numTopics; i++) {
const expectedString = `hello pubsub ${Math.random()}`
expectedStrings.push(expectedString)

await ipfs2.pubsub.publish(topics[i], uint8ArrayFromString(expectedString))
}

for (let i = 0; i < numTopics; i++) {
const [sub1Msg] = await all(msgStreams[i].msgStream1)
expect(uint8ArrayToString(sub1Msg.data)).to.equal(expectedStrings[i])
expect(sub1Msg.from).to.eql(ipfs2Id.id)

const [sub2Msg] = await all(msgStreams[i].msgStream2)
expect(uint8ArrayToString(sub2Msg.data)).to.equal(expectedStrings[i])
expect(sub2Msg.from).to.eql(ipfs2Id.id)
}
})

it('should unsubscribe multiple handlers', async () => {
// @ts-ignore this is mocha
this.timeout(2 * 60 * 1000)

const topic = `topic-${Math.random()}`

const handler1 = sinon.stub()
const handler2 = sinon.stub()

await Promise.all([
ipfs1.pubsub.subscribe(topic, sinon.stub()),
ipfs2.pubsub.subscribe(topic, handler1),
ipfs2.pubsub.subscribe(topic, handler2)
])

await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)

expect(handler1).to.have.property('callCount', 0)
expect(handler2).to.have.property('callCount', 0)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)

await ipfs2.pubsub.unsubscribe(topic)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)
})

it('should unsubscribe individual handlers', async () => {
// @ts-ignore this is mocha
this.timeout(2 * 60 * 1000)

const topic = `topic-${Math.random()}`

const handler1 = sinon.stub()
const handler2 = sinon.stub()

await Promise.all([
ipfs1.pubsub.subscribe(topic, sinon.stub()),
ipfs2.pubsub.subscribe(topic, handler1),
ipfs2.pubsub.subscribe(topic, handler2)
])

await waitForPeers(ipfs1, topic, [ipfs2Id.id], 30000)

expect(handler1).to.have.property('callCount', 0)
expect(handler2).to.have.property('callCount', 0)

await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 1'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 1)

await ipfs2.pubsub.unsubscribe(topic, handler1)
await ipfs1.pubsub.publish(topic, uint8ArrayFromString('hello world 2'))

await delay(1000)

expect(handler1).to.have.property('callCount', 1)
expect(handler2).to.have.property('callCount', 2)
})
})
})
}
1 change: 1 addition & 0 deletions packages/ipfs-grpc-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"it-pushable": "^1.4.2",
"multiaddr": "^10.0.0",
"multiformats": "^9.4.1",
"p-defer": "^3.0.0",
"protobufjs": "^6.10.2",
"wherearewe": "1.0.0",
"ws": "^7.3.1"
Expand Down
58 changes: 58 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'use strict'

const serverStreamToIterator = require('../../utils/server-stream-to-iterator')
const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
const subscriptions = require('./subscriptions')
const defer = require('p-defer')

/**
* @param {import('@improbable-eng/grpc-web').grpc} grpc
* @param {*} service
* @param {import('../../types').Options} opts
*/
module.exports = function grpcPubsubSubscribe (grpc, service, opts) {
/**
* @type {import('ipfs-core-types/src/pubsub').API["subscribe"]}
*/
async function pubsubSubscribe (topic, handler, options = {}) {
const request = {
topic
}

const deferred = defer()

Promise.resolve().then(async () => {
try {
for await (const result of serverStreamToIterator(grpc, service, request, {
host: opts.url,
debug: Boolean(process.env.DEBUG),
metadata: options,
agent: opts.agent
})) {
if (result.handler) {
const subs = subscriptions.get(topic) || new Map()
subs.set(result.handler, handler)
subscriptions.set(topic, subs)

deferred.resolve()
} else {
handler({
from: result.from,
seqno: result.seqno,
data: result.data,
topicIDs: result.topicIDs
})
}
}
} catch (err) {
if (options && options.onError) {
options.onError(err)
}
}
})

await deferred.promise
}

return withTimeoutOption(pubsubSubscribe)
}
10 changes: 10 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/subscriptions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict'

/**
* @typedef {import('ipfs-core-types/src/pubsub').MessageHandlerFn} Subscription
*/

/** @type {Map<string, Map<string, Subscription>>} */
const subs = new Map()

module.exports = subs
56 changes: 56 additions & 0 deletions packages/ipfs-grpc-client/src/core-api/pubsub/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

const withTimeoutOption = require('ipfs-core-utils/src/with-timeout-option')
const toHeaders = require('../../utils/to-headers')
const unaryToPromise = require('../../utils/unary-to-promise')
const subscriptions = require('./subscriptions')

/**
* @param {import('@improbable-eng/grpc-web').grpc} grpc
* @param {*} service
* @param {import('../../types').Options} opts
*/
module.exports = function grpcPubsubUnsubscribe (grpc, service, opts) {
/**
* @type {import('ipfs-core-types/src/pubsub').API["unsubscribe"]}
*/
async function pubsubUnsubscribe (topic, handler, options = {}) {
const handlers = []
const subs = subscriptions.get(topic)

if (!subs) {
return
}

if (handler) {
for (const [key, value] of subs.entries()) {
if (value === handler) {
handlers.push(key)
}
}
} else {

}

const request = {
topic,
handlers
}

await unaryToPromise(grpc, service, request, {
host: opts.url,
metadata: toHeaders(options),
agent: opts.agent
})

for (const handlerId of handlers) {
subs.delete(handlerId)
}

if (!subs.size) {
subscriptions.delete(topic)
}
}

return withTimeoutOption(pubsubUnsubscribe)
}
6 changes: 6 additions & 0 deletions packages/ipfs-grpc-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ function create (opts = { url: '' }) {
ls: require('./core-api/files/ls')(grpc, service.MFS.ls, options),
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
write: require('./core-api/files/write')(grpc, service.MFS.write, options)
},
pubsub: {
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
subscribe: require('./core-api/pubsub/subscribe')(grpc, service.PubSub.subscribe, options),
// @ts-ignore - TODO: fix after https://github.com/ipfs/js-ipfs/issues/3594
unsubscribe: require('./core-api/pubsub/unsubscribe')(grpc, service.PubSub.unsubscribe, options)
}
}

Expand Down
31 changes: 31 additions & 0 deletions packages/ipfs-grpc-protocol/src/pubsub.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";

import "common.proto";

package ipfs;

service PubSub {
rpc subscribe (SubscribeRequest) returns (stream SubscribeResponse) {}
rpc unsubscribe (UnSubscribeRequest) returns (UnSubscribeResponse) {}
}

message SubscribeRequest {
string topic = 1;
}

message SubscribeResponse {
string handler = 1;
string from = 2;
bytes seqno = 3;
bytes data = 4;
repeated string topicIDs = 5;
}

message UnSubscribeRequest {
string topic = 1;
repeated string handlers = 2;
}

message UnSubscribeResponse {

}
1 change: 1 addition & 0 deletions packages/ipfs-grpc-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.2",
"multiaddr": "^10.0.0",
"nanoid": "3.1.23",
"protobufjs": "^6.10.2",
"ws": "^7.3.1"
},
Expand Down
Loading