Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: not dial all known peers in parallel on startup #698

Merged
merged 9 commits into from
Jul 14, 2020
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const after = async () => {
}

module.exports = {
bundlesize: { maxSize: '200kB' },
bundlesize: { maxSize: '202kB' },
hooks: {
pre: before,
post: after
Expand Down
101 changes: 85 additions & 16 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:connection-manager')
log.error = debug('libp2p:connection-manager:error')

const errcode = require('err-code')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')
Expand All @@ -22,6 +25,7 @@ const defaultOptions = {
maxReceivedData: Infinity,
maxEventLoopDelay: Infinity,
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
defaultPeerValue: 1
}
Expand All @@ -45,6 +49,8 @@ class ConnectionManager extends EventEmitter {
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
* @param {boolean} options.autoDial Should preemptively guarantee connections are above the low watermark. Default=true
* @param {Number} options.autoDialInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000
*/
constructor (libp2p, options) {
super()
Expand All @@ -57,7 +63,7 @@ class ConnectionManager extends EventEmitter {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)
log('options: %j', this._options)

this._libp2p = libp2p

Expand All @@ -73,8 +79,11 @@ class ConnectionManager extends EventEmitter {
*/
this.connections = new Map()

this._started = false
this._timer = null
this._autoDialTimeout = null
this._checkMetrics = this._checkMetrics.bind(this)
this._autoDial = this._autoDial.bind(this)
}

/**
Expand All @@ -101,19 +110,25 @@ class ConnectionManager extends EventEmitter {
})
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
debug('started')

this._started = true
log('started')

this._options.autoDial && this._autoDial()
}

/**
* Stops the Connection Manager
* @async
*/
async stop () {
this._autoDialTimeout && this._autoDialTimeout.clear()
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

this._started = false
await this._close()
debug('stopped')
log('stopped')
}

/**
Expand Down Expand Up @@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter {
_checkMetrics () {
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
this._checkMaxLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxSentData', sent)
this._checkMaxLimit('maxSentData', sent)
const total = received + sent
this._checkLimit('maxData', total)
debug('metrics update', total)
this._checkMaxLimit('maxData', total)
log('metrics update', total)
this._timer.reschedule(this._options.pollInterval)
}

Expand All @@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter {
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
}

this._checkLimit('maxConnections', this.size)
this._checkMaxLimit('maxConnections', this.size)
}

/**
Expand Down Expand Up @@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter {
* @param {*} summary The LatencyMonitor summary
*/
_onLatencyMeasure (summary) {
this._checkLimit('maxEventLoopDelay', summary.avgMs)
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
}

/**
Expand All @@ -257,15 +272,69 @@ class ConnectionManager extends EventEmitter {
* @param {string} name The name of the field to check limits for
* @param {number} value The current value of the field
*/
_checkLimit (name, value) {
_checkMaxLimit (name, value) {
const limit = this._options[name]
debug('checking limit of %s. current value: %d of %d', name, value, limit)
log('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
log('%s: limit exceeded: %s, %d', this._peerId, name, value)
this._maybeDisconnectOne()
}
}

/**
* Proactively tries to connect to known peers stored in the PeerStore.
* It will keep the number of connections below the upper limit and sort
* the peers to connect based on wether we know their keys and protocols.
* @async
* @private
*/
async _autoDial () {
const minConnections = this._options.minConnections

const recursiveTimeoutTrigger = () => {
if (this._autoDialTimeout) {
this._autoDialTimeout.reschedule(this._options.autoDialInterval)
} else {
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
}
}

// Already has enough connections
if (this.size >= minConnections) {
recursiveTimeoutTrigger()
return
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
}

// Sort peers on wether we know protocols of public keys for them
const peers = Array.from(this._libp2p.peerStore.peers.values())
.sort((a, b) => {
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
return 1
} else if (b.id.pubKey && !a.id.pubKey) {
return 1
}
return -1
})

for (let i = 0; i < peers.length && this.size < minConnections; i++) {
if (!this.get(peers[i].id)) {
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
try {
await this._libp2p.dialer.connectToPeer(peers[i].id)

// Connection Manager was stopped
if (!this._started) {
return
}
} catch (err) {
log.error('could not connect to peerStore stored peer', err)
}
}
}

recursiveTimeoutTrigger()
}

/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
Expand All @@ -274,12 +343,12 @@ class ConnectionManager extends EventEmitter {
_maybeDisconnectOne () {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
log('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
log('%s: lowest value peer is %s', this._peerId, peerId)
log('%s: closing a connection to %j', this._peerId, peerId)
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
Expand Down
13 changes: 8 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class Libp2p extends EventEmitter {
this._discovery = new Map() // Discovery service instances/references

// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
this.connectionManager = new ConnectionManager(this, {
autoDial: this._config.peerDiscovery.autoDial,
...this._options.connectionManager
})

// Create Metrics
if (this._options.metrics.enabled) {
Expand Down Expand Up @@ -459,19 +462,19 @@ class Libp2p extends EventEmitter {
async _onDidStart () {
this._isStarted = true

this.connectionManager.start()

this.peerStore.on('peer', peerId => {
this.emit('peer:discovery', peerId)
this._maybeConnect(peerId)
})

// Once we start, emit and dial any peers we may have already discovered
// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
for (const peer of this.peerStore.peers.values()) {
this.emit('peer:discovery', peer.id)
this._maybeConnect(peer.id)
}

this.connectionManager.start()

// Peer discovery
await this._setupPeerDiscovery()
}
Expand Down
Loading