Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: PubSub Interop Tests and CLI+HTTP-API Implementation (#1081)
Browse files Browse the repository at this point in the history
* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* fiix(pubsub-subscribe): stop HAPI gzip from buffering our streamed response

* test: fix spec/pubsub

* fix: lint errors

* test: tests js/go pubsub interop

* test: pubsub interop tests

* test: enable pubsub tests

* fix: generate meaniful error when pubsub is called and not enabled

* test: enable pubsub for factory daemon

* fiix(pubsub-subscribe): stop HAPI gzip from buffering our streamed response

* test: fix spec/pubsub

* fix: lint errors

* test: tests js/go pubsub interop

* test: pubsub interop tests

* test: more tests with different data types

Note that binary data from JS to GO fails

* HTTP API server: parsing query string as binary in pubsub publish

* HTTP API: pubsub: publish should fail gracefully when no argument is given

* chore: update deps

* chore: update deps

* last pass
  • Loading branch information
richardschneider authored and daviddias committed Nov 22, 2017
1 parent 71faa92 commit 99f0f87
Show file tree
Hide file tree
Showing 17 changed files with 352 additions and 76 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"test:unit:node:gateway": "aegir test -t node -f test/gateway/index.js",
"test:unit:node:cli": "aegir test -t node -f test/cli/index.js",
"test:unit:browser": "aegir test -t browser --no-cors",
"test:interop": "IPFS_TEST=interop aegir test -t node -t browser -f test/interop",
"test:interop": "IPFS_TEST=interop aegir test -t node -f test/interop",
"test:interop:node": "IPFS_TEST=interop aegir test -t node -f test/interop/node.js",
"test:interop:browser": "IPFS_TEST=interop aegir test -t browser -f test/interop/browser.js",
"test:bootstrapers": "IPFS_TEST=bootstrapers aegir test -t browser -f test/bootstrapers.js",
Expand Down Expand Up @@ -63,7 +63,7 @@
},
"homepage": "https://github.com/ipfs/js-ipfs#readme",
"devDependencies": {
"aegir": "^12.1.3",
"aegir": "^12.2.0",
"buffer-loader": "0.0.1",
"chai": "^4.1.2",
"delay": "^2.0.0",
Expand All @@ -76,7 +76,7 @@
"form-data": "^2.3.1",
"hat": "0.0.3",
"interface-ipfs-core": "~0.36.7",
"ipfsd-ctl": "~0.24.1",
"ipfsd-ctl": "~0.25.1",
"left-pad": "^1.2.0",
"lodash": "^4.17.4",
"mocha": "^4.0.1",
Expand All @@ -92,6 +92,7 @@
},
"dependencies": {
"async": "^2.6.0",
"binary-querystring": "~0.1.2",
"bl": "^1.2.1",
"boom": "^7.1.1",
"bs58": "^4.0.1",
Expand All @@ -106,7 +107,7 @@
"hapi": "^16.6.2",
"hapi-set-header": "^1.0.2",
"hoek": "^5.0.2",
"ipfs-api": "^17.1.0",
"ipfs-api": "^17.1.1",
"ipfs-bitswap": "~0.17.4",
"ipfs-block": "~0.6.1",
"ipfs-block-service": "~0.13.0",
Expand Down
24 changes: 24 additions & 0 deletions src/core/components/no-floodsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const EventEmitter = require('events')

function fail () {
throw new Error('The daemon must be run with \'--enable-pubsub-experiment\'')
}

class NoFloodSub extends EventEmitter {
constructor () {
super()

this.peers = new Map()
this.subscriptions = new Set()
}

start (callback) { callback() }
stop (callback) { callback() }
publish () { fail() }
subscribe () { fail() }
unsubscribe () { fail() }
}

module.exports = NoFloodSub
11 changes: 5 additions & 6 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')
const NoFloodSub = require('./no-floodsub')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

Expand Down Expand Up @@ -50,12 +51,10 @@ module.exports = (self) => {
self._bitswap.start()
self._blockService.setExchange(self._bitswap)

if (self._options.EXPERIMENTAL.pubsub) {
self._pubsub = new FloodSub(self._libp2pNode)
self._pubsub.start(done)
} else {
done()
}
self._pubsub = self._options.EXPERIMENTAL.pubsub
? new FloodSub(self._libp2pNode)
: new NoFloodSub()
self._pubsub.start(done)
})
})
}
8 changes: 1 addition & 7 deletions src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ module.exports = (self) => {
self._bitswap.stop()

series([
(cb) => {
if (self._options.EXPERIMENTAL.pubsub) {
self._pubsub.stop(cb)
} else {
cb()
}
},
(cb) => self._pubsub.stop(cb),
(cb) => self.libp2p.stop(cb),
(cb) => self._repo.close(cb)
], done)
Expand Down
8 changes: 6 additions & 2 deletions src/http/api/resources/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const PassThrough = require('stream').PassThrough
const bs58 = require('bs58')
const binaryQueryString = require('binary-querystring')

exports = module.exports

Expand Down Expand Up @@ -48,6 +49,7 @@ exports.subscribe = {

reply(res)
.header('X-Chunked-Output', '1')
.header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975
.header('content-type', 'application/json')
})
}
Expand All @@ -57,7 +59,9 @@ exports.publish = {
handler: (request, reply) => {
const arg = request.query.arg
const topic = arg[0]
const buf = arg[1]

const rawArgs = binaryQueryString(request.url.search)
const buf = rawArgs.arg && rawArgs.arg[1]

const ipfs = request.server.app.ipfs

Expand All @@ -69,7 +73,7 @@ exports.publish = {
return reply(new Error('Missing buf'))
}

ipfs.pubsub.publish(topic, Buffer.from(String(buf)), (err) => {
ipfs.pubsub.publish(topic, buf, (err) => {
if (err) {
return reply(new Error(`Failed to publish to topic ${topic}: ${err}`))
}
Expand Down
2 changes: 1 addition & 1 deletion test/cli/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const createTempNode = ''
const repoPath = require('./index').repoPath
const ipfs = require('../utils/ipfs-exec')(repoPath)

describe.skip('pubsub', () => {
describe('pubsub', () => {
const topicA = 'nonscentsA'
const topicB = 'nonscentsB'
const topicC = 'nonscentsC'
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/go-ipfs-repo/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6
6
5 changes: 4 additions & 1 deletion test/http-api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ describe('HTTP API', () => {
let http = {}

before((done) => {
http.api = new API(repoTests)
const options = {
enablePubsubExperiment: true
}
http.api = new API(repoTests, null, options)

ncp(repoExample, repoTests, (err) => {
expect(err).to.not.exist()
Expand Down
3 changes: 0 additions & 3 deletions test/http-api/interface/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

'use strict'

// TODO needs: https://github.com/ipfs/js-ipfs-api/pull/493
/*
const test = require('interface-ipfs-core')
const FactoryClient = require('./../../utils/ipfs-factory-daemon')

Expand All @@ -20,4 +18,3 @@ const common = {
}

test.pubsub(common)
*/
22 changes: 2 additions & 20 deletions test/http-api/spec/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,17 @@ const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const createTempNode = ''

// TODO migrate to use ipfs-factory-daemon
module.exports = (http) => {
describe.skip('/pubsub', () => {
describe('/pubsub', () => {
let api
let tmpNode

const buf = Buffer.from('some message')
const topic = 'nonScents'
const topicNotSubscribed = 'somethingRandom'

before((done) => {
before(() => {
api = http.api.server.select('API')

createTempNode(47, (err, _ipfs) => {
expect(err).to.not.exist()
tmpNode = _ipfs
tmpNode.goOnline((err) => {
expect(err).to.not.exist()
done()
})
})
})

after((done) => {
setTimeout(() => {
tmpNode.goOffline(done)
}, 1000)
})

describe('/sub', () => {
Expand Down
1 change: 1 addition & 0 deletions test/interop/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ require('./exchange-files')
require('./circuit-relay')
require('./kad-dht')
require('./pubsub')
require('./pubsub-go')
Loading

0 comments on commit 99f0f87

Please sign in to comment.