Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: discover and connect to closest peers #798

Merged
merged 3 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerRouting.getClosestPeers`](#peerroutinggetclosestpeers)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
Expand Down Expand Up @@ -99,6 +100,7 @@ Creates an instance of Libp2p.
| [options.keychain] | [`object`](./CONFIGURATION.md#setup-with-keychain) | keychain [configuration](./CONFIGURATION.md#setup-with-keychain) |
| [options.metrics] | [`object`](./CONFIGURATION.md#configuring-metrics) | libp2p Metrics [configuration](./CONFIGURATION.md#configuring-metrics) |
| [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) |
| [options.peerRouting] | [`object`](./CONFIGURATION.md#setup-with-content-and-peer-routing) | libp2p Peer routing service [configuration](./CONFIGURATION.md#setup-with-content-and-peer-routing) |
| [options.peerStore] | [`object`](./CONFIGURATION.md#configuring-peerstore) | libp2p PeerStore [configuration](./CONFIGURATION.md#configuring-peerstore) |

For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md).
Expand Down Expand Up @@ -675,6 +677,36 @@ Iterates over all peer routers in series to find the given peer. If the DHT is e
const peer = await libp2p.peerRouting.findPeer(peerId, options)
```

### peerRouting.getClosestPeers

Iterates over all content routers in series to get the closest peers of the given key.
Once a content router succeeds, the iteration will stop. If the DHT is enabled, it will be queried first.

`libp2p.peerRouting.getClosestPeers(cid, options)`

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| key | `Uint8Array` | A CID like key |
| options | `object` | operation options |
| options.timeout | `number` | How long the query can take (ms). |

#### Returns

| Type | Description |
|------|-------------|
| `AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }` | Async iterator for peer data |

#### Example

```js
// Iterate over the closest peers found for the given key
for await (const peer of libp2p.peerRouting.getClosestPeers(key)) {
console.log(peer.id, peer.multiaddrs)
}
```

### peerStore.addressBook.add

Adds known `multiaddrs` of a given peer. If the peer is not known, it will be set with the provided multiaddrs.
Expand Down
9 changes: 8 additions & 1 deletion doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,14 @@ const node = await Libp2p.create({
new DelegatedPeerRouter()
],
},
peerId
peerId,
peerRouting: { // Peer routing configuration
refreshManager: { // Refresh known and connected closest peers
enabled: true, // Should find the closest peers.
interval: 6e5, // Interval for getting the new for closest peers of 10min
bootDelay: 10e3 // Delay for the initial query for closest peers
}
}
})
```

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"protons": "^2.0.0",
"retimer": "^2.0.0",
"sanitize-filename": "^1.6.3",
"set-delayed-interval": "^1.0.0",
"streaming-iterables": "^5.0.2",
"timeout-abort-controller": "^1.1.1",
"varint": "^5.0.0",
Expand All @@ -91,6 +92,7 @@
"chai-string": "^1.5.0",
"delay": "^4.3.0",
"interop-libp2p": "^0.3.0",
"into-stream": "^6.0.0",
"ipfs-http-client": "^47.0.1",
"it-concat": "^1.0.0",
"it-pair": "^1.0.0",
Expand Down
7 changes: 7 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ const DefaultConfig = {
persistence: false,
threshold: 5
},
peerRouting: {
refreshManager: {
enabled: true,
interval: 6e5,
bootDelay: 10e3
}
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing as it's not quite peer routing. This is really the refresh manager. In go-libp2p since they just have the DHT right now they run the table refreshes when they search for their closest peers. This happens when DHT bootstrap is run at startup and then on an interval. Ideally we would not use a boot delay timer as this is finicky, and instead run once we've finished a bootstrap phase. Since we don't really have this concept atm, I think a delay is fine for now.

Go runs this every 10 minutes by default right now, we should just match that for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to change this into 10 minutes yes. Regarding the namings, thinking on changing this to:

peerRouting: {
  refreshManager: { ... }
}

The peerRouting is the scope where we make these queries, so it seems that this configs should live bound to it. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as suggested for now

config: {
dht: {
enabled: false,
Expand Down
8 changes: 5 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ log.error = debug('libp2p:error')
const errCode = require('err-code')
const PeerId = require('peer-id')

const peerRouting = require('./peer-routing')
const PeerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const getPeer = require('./get-peer')
const { validate: validateConfig } = require('./config')
Expand Down Expand Up @@ -192,7 +192,7 @@ class Libp2p extends EventEmitter {

// Attach remaining APIs
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this)
this.peerRouting = new PeerRouting(this)
this.contentRouting = contentRouting(this)

// Mount default protocols
Expand Down Expand Up @@ -249,8 +249,8 @@ class Libp2p extends EventEmitter {
try {
this._isStarted = false

// Relay
this.relay && this.relay.stop()
this.peerRouting.stop()

for (const service of this._discovery.values()) {
service.removeListener('peer', this._onDiscoveryPeer)
Expand Down Expand Up @@ -495,6 +495,8 @@ class Libp2p extends EventEmitter {

// Relay
this.relay && this.relay.start()

this.peerRouting.start()
}

/**
Expand Down
132 changes: 109 additions & 23 deletions src/peer-routing.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,126 @@
'use strict'

const errCode = require('err-code')
const debug = require('debug')
const log = debug('libp2p:peer-routing')
log.error = debug('libp2p:peer-routing:error')

const all = require('it-all')
const pAny = require('p-any')
const {
setDelayedInterval,
clearDelayedInterval
} = require('set-delayed-interval')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this module for us to use here, but also in:

It is always tricky to implement a "setInterval" when the recurrent task is a promise. Other than that, if we need a delay beforehand, things get even more complex as we need to guarantee that the timer can be stopped for any of the pauses.

When looking at the relay logic I found a bug. If we clear the timeout when await this._libp2p.contentRouting.provide(cid) is running, then the timeout will be assigned again afterward. We should be checking if we had timeout before creating the new one. This module aims to avoid introducing these issues when dealing with this use cases.

It would be great to have your 👀 on the module: vasco-santos/set-delayed-interval#1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd really like to get rid of this whole boot delay logic. In reality we should probably have some async boot function and when that finishes these other systems initiate, but I think we can revisit this when we update the config logic. I'll review the module

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to orchestrate subsystems like that. I will add this comment to the configuration issue for us to remember


/**
* Responsible for managing the usage of the available Peer Routing modules.
*/
class PeerRouting {
/**
* @class
* @param {Libp2p} libp2p
*/
constructor (libp2p) {
this._peerId = libp2p.peerId
this._peerStore = libp2p.peerStore
this._routers = libp2p._modules.peerRouting || []

// If we have the dht, make it first
if (libp2p._dht) {
this._routers.unshift(libp2p._dht)
}

this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager

this._findClosestPeersTask = this._findClosestPeersTask.bind(this)
}

/**
* Start peer routing service.
*/
start () {
if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) {
return
}

this._timeoutId = setDelayedInterval(
this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay
)
}

module.exports = (node) => {
const routers = node._modules.peerRouting || []
/**
* Recurrent task to find closest peers and add their addresses to the Address Book.
*/
async _findClosestPeersTask () {
try {
for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) {
this._peerStore.addressBook.add(id, multiaddrs)
}
} catch (err) {
log.error(err)
}
}

// If we have the dht, make it first
if (node._dht) {
routers.unshift(node._dht)
/**
* Stop peer routing service.
*/
stop () {
clearDelayedInterval(this._timeoutId)
}

return {
/**
* Iterates over all peer routers in series to find the given peer.
*
* @param {string} id - The id of the peer to find
* @param {object} [options]
* @param {number} [options.timeout] - How long the query should run
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
findPeer: async (id, options) => { // eslint-disable-line require-await
if (!routers.length) {
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
/**
* Iterates over all peer routers in series to find the given peer.
*
* @param {string} id - The id of the peer to find
* @param {object} [options]
* @param {number} [options.timeout] - How long the query should run
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options) { // eslint-disable-line require-await
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}

return pAny(this._routers.map(async (router) => {
const result = await router.findPeer(id, options)

// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}

return pAny(routers.map(async (router) => {
const result = await router.findPeer(id, options)
return result
}))
}

/**
* Attempt to find the closest peers on the network to the given key.
*
* @param {Uint8Array} key - A CID like key
* @param {Object} [options]
* @param {number} [options.timeout=30e3] - How long the query can take.
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * getClosestPeers (key, options = { timeout: 30e3 }) {
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}

// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
const result = await pAny(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not going to block this PR on this (as we're doing it in other content/peer routing calls), but we should think about the parallel actions this is doing. Most people shouldn't be using two, but in the change they are, the parallel calls could cause some odd behavior. This is probably worth discussing in more depth when we look at the discovery api / configuration updates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I will cross post it here for us to discuss

this._routers.map(async (router) => {
const peers = await all(router.getClosestPeers(key, options))

if (!peers || !peers.length) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
return peers
})
)

return result
}))
for (const peer of result) {
yield peer
}
}
}

module.exports = PeerRouting
Loading