Skip to content

Commit

Permalink
feat: support multiple peer and content routing modules
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Aug 23, 2018
1 parent a8e07f9 commit 31df87d
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ const OptionsSchema = Joi.object({
connProtector: Joi.object().keys({
protect: Joi.func().required()
}).unknown(),
contentRouting: Joi.object(),
contentRouting: Joi.array().items(Joi.object()).allow(null),
dht: ModuleSchema.allow(null),
peerDiscovery: Joi.array().items(ModuleSchema).allow(null),
peerRouting: Joi.object(),
peerRouting: Joi.array().items(Joi.object()).allow(null),
streamMuxer: Joi.array().items(ModuleSchema).allow(null),
transport: Joi.array().items(ModuleSchema).min(1).required()
}).required(),
Expand Down
63 changes: 57 additions & 6 deletions src/content-routing.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,71 @@
'use strict'

const tryEach = require('async/tryEach')
const parallel = require('async/parallel')

module.exports = (node) => {
const routers = node._modules.contentRouting || []

// If we have the dht, make it first
if (node._dht) {
routers.unshift(node._dht)
}

return {
/**
* Iterates over all content routers in series to find providers of the given key.
* Once a content router succeeds, iteration will stop.
*
* @param {CID} key The CID key of the content to find
* @param {number} timeout How long the query should run
* @param {function(Error, Result<Array>)} callback
* @returns {void}
*/
findProviders: (key, timeout, callback) => {
if (!node._dht) {
return callback(new Error('DHT is not available'))
if (routers.length === 0) {
return callback(new Error('No content routers available'))
}

node._dht.findProviders(key, timeout, callback)
const tasks = routers.map((router) => {
return (cb) => router.findProviders(key, timeout, (err, results) => {
if (err) {
return cb(err)
}

// If we don't have any results, we need to provide an error to keep trying
if (!results || Object.keys(results).length === 0) {
return cb(true, null)
}

cb(null, results)
})
})

tryEach(tasks, (err, results) => {
if (err && err !== true) {
return callback(err)
}
results = results || []
callback(null, results)
})
},

/**
* Iterates over all content routers in parallel to notify it is
* a provider of the given key.
*
* @param {CID} key The CID key of the content to find
* @param {function(Error)} callback
* @returns {void}
*/
provide: (key, callback) => {
if (!node._dht) {
return callback(new Error('DHT is not available'))
if (routers.length === 0) {
return callback(new Error('No content routers available'))
}

node._dht.provide(key, callback)
parallel(routers.map((router) => {
return (cb) => router.provide(key, cb)
}), callback)
}
}
}
6 changes: 3 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ class Node extends EventEmitter {
}

// Attach remaining APIs
// If peer or content routing modules have been provided, use those, otherwise use the dht
this.peerRouting = this._modules.peerRouting || peerRouting(this)
this.contentRouting = this._modules.contentRouting || contentRouting(this)
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)

this._getPeerInfo = getPeerInfo(this)
Expand Down
43 changes: 40 additions & 3 deletions src/peer-routing.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
'use strict'

const tryEach = require('async/tryEach')

module.exports = (node) => {
const routers = node._modules.peerRouting || []

// If we have the dht, make it first
if (node._dht) {
routers.unshift(node._dht)
}

return {
/**
* Iterates over all peer routers in series to find the given peer.
*
* @param {String} id The id of the peer to find
* @param {function(Error, Result<Array>)}
* @returns {void}
*/
findPeer: (id, callback) => {
if (!node._dht) {
return callback(new Error('DHT is not available'))
if (routers.length === 0) {
return callback(new Error('No peer routers available'))
}

node._dht.findPeer(id, callback)
const tasks = routers.map((router) => {
return (cb) => router.findPeer(id, (err, result) => {
if (err) {
return cb(err)
}

// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
return cb(true, null)
}

cb(null, result)
})
})

tryEach(tasks, (err, results) => {
if (err && err !== true) {
return callback(err)
}
results = results || null
callback(null, results)
})
}
}
}
8 changes: 4 additions & 4 deletions test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ describe('configuration', () => {
modules: {
transport: [ WS ],
peerDiscovery: [ Bootstrap ],
peerRouting: peerRouter,
contentRouting: contentRouter
peerRouting: [ peerRouter ],
contentRouting: [ contentRouter ]
},
config: {
peerDiscovery: {
Expand All @@ -123,8 +123,8 @@ describe('configuration', () => {
}

expect(validateConfig(options).modules).to.deep.include({
peerRouting: peerRouter,
contentRouting: contentRouter
peerRouting: [ peerRouter ],
contentRouting: [ contentRouter ]
})
})

Expand Down
99 changes: 97 additions & 2 deletions test/content-routing.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ describe('.contentRouting', () => {
nodeA = new Node({
peerInfo,
modules: {
contentRouting: delegate
contentRouting: [ delegate ]
},
config: {
relay: {
Expand All @@ -157,7 +157,8 @@ describe('.contentRouting', () => {
], done)
})

afterEach(() => nock.cleanAll)
after((done) => nodeA.stop(done))
afterEach(() => nock.cleanAll())

describe('provide', () => {
it('should use the delegate router to provide', (done) => {
Expand Down Expand Up @@ -272,4 +273,98 @@ describe('.contentRouting', () => {
})
})
})

describe('via the dht and a delegate', () => {
let nodeA
let delegate

before((done) => {
waterfall([
(cb) => {
createPeerInfo(cb)
},
// Create the node using the delegate
(peerInfo, cb) => {
delegate = new DelegatedContentRouter(peerInfo.id, {
host: '0.0.0.0',
protocol: 'http',
port: 60197
}, [
ma('/ip4/0.0.0.0/tcp/60194')
])
nodeA = new Node({
peerInfo,
modules: {
contentRouting: [ delegate ]
},
config: {
relay: {
enabled: true,
hop: {
enabled: true,
active: false
}
},
EXPERIMENTAL: {
dht: true
}
}
})
nodeA.start(cb)
}
], done)
})

after((done) => nodeA.stop(done))

describe('provide', () => {
it('should use both the dht and delegate router to provide', (done) => {
const dhtStub = sinon.stub(nodeA._dht, 'provide').callsFake(() => {})
const delegateStub = sinon.stub(delegate, 'provide').callsFake(() => {
expect(dhtStub.calledOnce).to.equal(true)
expect(delegateStub.calledOnce).to.equal(true)
delegateStub.restore()
dhtStub.restore()
done()
})
nodeA.contentRouting.provide()
})
})

describe('findProviders', () => {
it('should only use the dht if it finds providers', (done) => {
const results = [true]
const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, results)
const delegateStub = sinon.stub(delegate, 'findProviders').throws(() => {
return new Error('the delegate should not have been called')
})

nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => {
expect(err).to.not.exist()
expect(results).to.equal(results)
expect(dhtStub.calledOnce).to.equal(true)
expect(delegateStub.notCalled).to.equal(true)
delegateStub.restore()
dhtStub.restore()
done()
})
})

it('should use the delegate if the dht fails to find providers', (done) => {
const results = [true]
const dhtStub = sinon.stub(nodeA._dht, 'findProviders').callsArgWith(2, null, [])
const delegateStub = sinon.stub(delegate, 'findProviders').callsArgWith(2, null, results)

nodeA.contentRouting.findProviders('a cid', 5000, (err, results) => {
expect(err).to.not.exist()
expect(results).to.deep.equal(results)
expect(dhtStub.calledOnce).to.equal(true)
expect(delegateStub.calledOnce).to.equal(true)
delegateStub.restore()
dhtStub.restore()
done()
})
})
})
})
})
75 changes: 73 additions & 2 deletions test/peer-routing.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ describe('.peerRouting', () => {
})
createNode('/ip4/0.0.0.0/tcp/0', {
modules: {
peerRouting: delegate
peerRouting: [ delegate ]
}
}, (err, node) => {
expect(err).to.not.exist()
Expand All @@ -122,7 +122,8 @@ describe('.peerRouting', () => {
], done)
})

afterEach(nock.cleanAll)
after((done) => nodeA.stop(done))
afterEach(() => nock.cleanAll())

it('should use the delegate router to find peers', (done) => {
const stub = sinon.stub(delegate, 'findPeer').callsFake(() => {
Expand Down Expand Up @@ -192,4 +193,74 @@ describe('.peerRouting', () => {
})
})
})

describe('via the dht and a delegate', () => {
let nodeA
let delegate

before((done) => {
parallel([
// Create the node using the delegate
(cb) => {
delegate = new DelegatedPeerRouter({
host: 'ipfs.io',
protocol: 'https',
port: '443'
})
createNode('/ip4/0.0.0.0/tcp/0', {
modules: {
peerRouting: [ delegate ]
},
config: {
EXPERIMENTAL: {
dht: true
}
}
}, (err, node) => {
expect(err).to.not.exist()
nodeA = node
nodeA.start(cb)
})
}
], done)
})

after((done) => nodeA.stop(done))

describe('findPeer', () => {
it('should only use the dht if it find the peer', (done) => {
const results = [true]
const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, results)
const delegateStub = sinon.stub(delegate, 'findPeer').throws(() => {
return new Error('the delegate should not have been called')
})

nodeA.peerRouting.findPeer('a peer id', (err, results) => {
expect(err).to.not.exist()
expect(results).to.equal(results)
expect(dhtStub.calledOnce).to.equal(true)
expect(delegateStub.notCalled).to.equal(true)
delegateStub.restore()
dhtStub.restore()
done()
})
})

it('should use the delegate if the dht fails to find the peer', (done) => {
const results = [true]
const dhtStub = sinon.stub(nodeA._dht, 'findPeer').callsArgWith(1, null, undefined)
const delegateStub = sinon.stub(delegate, 'findPeer').callsArgWith(1, null, results)

nodeA.peerRouting.findPeer('a peer id', (err, results) => {
expect(err).to.not.exist()
expect(results).to.deep.equal(results)
expect(dhtStub.calledOnce).to.equal(true)
expect(delegateStub.calledOnce).to.equal(true)
delegateStub.restore()
dhtStub.restore()
done()
})
})
})
})
})

0 comments on commit 31df87d

Please sign in to comment.