Skip to content

Commit

Permalink
fix: not dial all known peers in parallel on startup (#698)
Browse files Browse the repository at this point in the history
* fix: not dial all known peers on startup

* feat: connection manager should proactively connect to peers from peerStore

* chore: increase bundle size

* fix: do connMgr proactive dial on an interval

* chore: address review

* chore: use retimer reschedule

* chore: address review

* fix: use minConnections in default config

* chore: minPeers to minConnections everywhere
  • Loading branch information
vasco-santos authored Jul 14, 2020
1 parent 619e5dd commit 9ccab40
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const after = async () => {
}

module.exports = {
bundlesize: { maxSize: '200kB' },
bundlesize: { maxSize: '202kB' },
hooks: {
pre: before,
post: after
Expand Down
2 changes: 1 addition & 1 deletion doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ const node = await Libp2p.create({
},
config: {
peerDiscovery: {
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers)
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections)
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
// The associated object, will be passed to the service when it is instantiated.
[MulticastDNS.tag]: {
Expand Down
2 changes: 1 addition & 1 deletion doc/GETTING_STARTED.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ const node = await Libp2p.create({
},
config: {
peerDiscovery: {
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers)
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections)
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
// The associated object, will be passed to the service when it is instantiated.
[Bootstrap.tag]: {
Expand Down
2 changes: 1 addition & 1 deletion src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const DefaultConfig = {
noAnnounce: []
},
connectionManager: {
minPeers: 25
minConnections: 25
},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
Expand Down
101 changes: 85 additions & 16 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:connection-manager')
log.error = debug('libp2p:connection-manager:error')

const errcode = require('err-code')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')
Expand All @@ -22,6 +25,7 @@ const defaultOptions = {
maxReceivedData: Infinity,
maxEventLoopDelay: Infinity,
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
defaultPeerValue: 1
}
Expand All @@ -45,6 +49,8 @@ class ConnectionManager extends EventEmitter {
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
* @param {boolean} options.autoDial Should preemptively guarantee connections are above the low watermark. Default=true
* @param {Number} options.autoDialInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000
*/
constructor (libp2p, options) {
super()
Expand All @@ -57,7 +63,7 @@ class ConnectionManager extends EventEmitter {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)
log('options: %j', this._options)

this._libp2p = libp2p

Expand All @@ -73,8 +79,11 @@ class ConnectionManager extends EventEmitter {
*/
this.connections = new Map()

this._started = false
this._timer = null
this._autoDialTimeout = null
this._checkMetrics = this._checkMetrics.bind(this)
this._autoDial = this._autoDial.bind(this)
}

/**
Expand All @@ -101,19 +110,25 @@ class ConnectionManager extends EventEmitter {
})
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
debug('started')

this._started = true
log('started')

this._options.autoDial && this._autoDial()
}

/**
* Stops the Connection Manager
* @async
*/
async stop () {
this._autoDialTimeout && this._autoDialTimeout.clear()
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

this._started = false
await this._close()
debug('stopped')
log('stopped')
}

/**
Expand Down Expand Up @@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter {
_checkMetrics () {
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
this._checkMaxLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxSentData', sent)
this._checkMaxLimit('maxSentData', sent)
const total = received + sent
this._checkLimit('maxData', total)
debug('metrics update', total)
this._checkMaxLimit('maxData', total)
log('metrics update', total)
this._timer.reschedule(this._options.pollInterval)
}

Expand All @@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter {
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
}

this._checkLimit('maxConnections', this.size)
this._checkMaxLimit('maxConnections', this.size)
}

/**
Expand Down Expand Up @@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter {
* @param {*} summary The LatencyMonitor summary
*/
_onLatencyMeasure (summary) {
this._checkLimit('maxEventLoopDelay', summary.avgMs)
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
}

/**
Expand All @@ -257,15 +272,69 @@ class ConnectionManager extends EventEmitter {
* @param {string} name The name of the field to check limits for
* @param {number} value The current value of the field
*/
_checkLimit (name, value) {
_checkMaxLimit (name, value) {
const limit = this._options[name]
debug('checking limit of %s. current value: %d of %d', name, value, limit)
log('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
log('%s: limit exceeded: %s, %d', this._peerId, name, value)
this._maybeDisconnectOne()
}
}

/**
* Proactively tries to connect to known peers stored in the PeerStore.
* It will keep the number of connections below the upper limit and sort
* the peers to connect based on wether we know their keys and protocols.
* @async
* @private
*/
async _autoDial () {
const minConnections = this._options.minConnections

const recursiveTimeoutTrigger = () => {
if (this._autoDialTimeout) {
this._autoDialTimeout.reschedule(this._options.autoDialInterval)
} else {
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
}
}

// Already has enough connections
if (this.size >= minConnections) {
recursiveTimeoutTrigger()
return
}

// Sort peers on wether we know protocols of public keys for them
const peers = Array.from(this._libp2p.peerStore.peers.values())
.sort((a, b) => {
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
return 1
} else if (b.id.pubKey && !a.id.pubKey) {
return 1
}
return -1
})

for (let i = 0; i < peers.length && this.size < minConnections; i++) {
if (!this.get(peers[i].id)) {
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
try {
await this._libp2p.dialer.connectToPeer(peers[i].id)

// Connection Manager was stopped
if (!this._started) {
return
}
} catch (err) {
log.error('could not connect to peerStore stored peer', err)
}
}
}

recursiveTimeoutTrigger()
}

/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
Expand All @@ -274,12 +343,12 @@ class ConnectionManager extends EventEmitter {
_maybeDisconnectOne () {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
log('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
log('%s: lowest value peer is %s', this._peerId, peerId)
log('%s: closing a connection to %j', this._peerId, peerId)
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
Expand Down
22 changes: 14 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ class Libp2p extends EventEmitter {
this._discovery = new Map() // Discovery service instances/references

// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
if (this._options.connectionManager.minPeers) { // Remove in 0.29
this._options.connectionManager.minConnections = this._options.connectionManager.minPeers
}
this.connectionManager = new ConnectionManager(this, {
autoDial: this._config.peerDiscovery.autoDial,
...this._options.connectionManager
})

// Create Metrics
if (this._options.metrics.enabled) {
Expand Down Expand Up @@ -460,19 +466,19 @@ class Libp2p extends EventEmitter {
async _onDidStart () {
this._isStarted = true

this.connectionManager.start()

this.peerStore.on('peer', peerId => {
this.emit('peer:discovery', peerId)
this._maybeConnect(peerId)
})

// Once we start, emit and dial any peers we may have already discovered
// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
for (const peer of this.peerStore.peers.values()) {
this.emit('peer:discovery', peer.id)
this._maybeConnect(peer.id)
}

this.connectionManager.start()

// Peer discovery
await this._setupPeerDiscovery()
}
Expand All @@ -496,15 +502,15 @@ class Libp2p extends EventEmitter {
/**
* Will dial to the given `peerId` if the current number of
* connected peers is less than the configured `ConnectionManager`
* minPeers.
* minConnections.
* @private
* @param {PeerId} peerId
*/
async _maybeConnect (peerId) {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) {
const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > this.connectionManager.size) {
const minConnections = this._options.connectionManager.minConnections || 0
if (minConnections > this.connectionManager.size) {
log('connecting to discovered peer %s', peerId.toB58String())
try {
await this.dialer.connectToPeer(peerId)
Expand Down
Loading

0 comments on commit 9ccab40

Please sign in to comment.