-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: not dial all known peers in parallel on startup #698
Changes from 4 commits
695f77c
3fcc2d6
4afb801
6ac3328
9156e29
2985a5f
4ea8838
c92d2ea
43a52a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,12 @@ | ||
'use strict' | ||
|
||
const debug = require('debug') | ||
const log = debug('libp2p:connection-manager') | ||
log.error = debug('libp2p:connection-manager:error') | ||
|
||
const errcode = require('err-code') | ||
const mergeOptions = require('merge-options') | ||
const LatencyMonitor = require('./latency-monitor') | ||
const debug = require('debug')('libp2p:connection-manager') | ||
const retimer = require('retimer') | ||
|
||
const { EventEmitter } = require('events') | ||
|
@@ -22,6 +25,7 @@ const defaultOptions = { | |
maxReceivedData: Infinity, | ||
maxEventLoopDelay: Infinity, | ||
pollInterval: 2000, | ||
maybeConnectInterval: 10000, | ||
movingAverageInterval: 60000, | ||
defaultPeerValue: 1 | ||
} | ||
|
@@ -45,6 +49,7 @@ class ConnectionManager extends EventEmitter { | |
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000 | ||
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000 | ||
* @param {Number} options.defaultPeerValue The value of the peer. Default=1 | ||
* @param {Number} options.maybeConnectInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000 | ||
*/ | ||
constructor (libp2p, options) { | ||
super() | ||
|
@@ -57,7 +62,7 @@ class ConnectionManager extends EventEmitter { | |
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS) | ||
} | ||
|
||
debug('options: %j', this._options) | ||
log('options: %j', this._options) | ||
|
||
this._libp2p = libp2p | ||
|
||
|
@@ -73,8 +78,11 @@ class ConnectionManager extends EventEmitter { | |
*/ | ||
this.connections = new Map() | ||
|
||
this._started = false | ||
this._timer = null | ||
this._maybeConnectTimeout = null | ||
this._checkMetrics = this._checkMetrics.bind(this) | ||
this._maybeConnectN = this._maybeConnectN.bind(this) | ||
} | ||
|
||
/** | ||
|
@@ -101,19 +109,26 @@ class ConnectionManager extends EventEmitter { | |
}) | ||
this._onLatencyMeasure = this._onLatencyMeasure.bind(this) | ||
this._latencyMonitor.on('data', this._onLatencyMeasure) | ||
debug('started') | ||
|
||
this._started = true | ||
log('started') | ||
|
||
this._maybeConnectN() | ||
} | ||
|
||
/** | ||
* Stops the Connection Manager | ||
* @async | ||
*/ | ||
async stop () { | ||
clearTimeout(this._maybeConnectTimeout) | ||
|
||
this._timer && this._timer.clear() | ||
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure) | ||
|
||
this._started = false | ||
await this._close() | ||
debug('stopped') | ||
log('stopped') | ||
} | ||
|
||
/** | ||
|
@@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter { | |
_checkMetrics () { | ||
const movingAverages = this._libp2p.metrics.global.movingAverages | ||
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() | ||
this._checkLimit('maxReceivedData', received) | ||
this._checkMaxLimit('maxReceivedData', received) | ||
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() | ||
this._checkLimit('maxSentData', sent) | ||
this._checkMaxLimit('maxSentData', sent) | ||
const total = received + sent | ||
this._checkLimit('maxData', total) | ||
debug('metrics update', total) | ||
this._checkMaxLimit('maxData', total) | ||
log('metrics update', total) | ||
this._timer.reschedule(this._options.pollInterval) | ||
} | ||
|
||
|
@@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter { | |
this._peerValues.set(peerIdStr, this._options.defaultPeerValue) | ||
} | ||
|
||
this._checkLimit('maxConnections', this.size) | ||
this._checkMaxLimit('maxConnections', this.size) | ||
} | ||
|
||
/** | ||
|
@@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter { | |
* @param {*} summary The LatencyMonitor summary | ||
*/ | ||
_onLatencyMeasure (summary) { | ||
this._checkLimit('maxEventLoopDelay', summary.avgMs) | ||
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs) | ||
} | ||
|
||
/** | ||
|
@@ -257,15 +272,62 @@ class ConnectionManager extends EventEmitter { | |
* @param {string} name The name of the field to check limits for | ||
* @param {number} value The current value of the field | ||
*/ | ||
_checkLimit (name, value) { | ||
_checkMaxLimit (name, value) { | ||
const limit = this._options[name] | ||
debug('checking limit of %s. current value: %d of %d', name, value, limit) | ||
log('checking limit of %s. current value: %d of %d', name, value, limit) | ||
if (value > limit) { | ||
debug('%s: limit exceeded: %s, %d', this._peerId, name, value) | ||
log('%s: limit exceeded: %s, %d', this._peerId, name, value) | ||
this._maybeDisconnectOne() | ||
} | ||
} | ||
|
||
/** | ||
* Proactively tries to connect to known peers stored in the PeerStore. | ||
* It will keep the number of connections below the upper limit and sort | ||
* the peers to connect based on wether we know their keys and protocols. | ||
* @async | ||
* @private | ||
*/ | ||
async _maybeConnectN () { | ||
this._isTryingToConnect = true | ||
|
||
const minConnections = this._options.minConnections | ||
|
||
// Already has enough connections | ||
if (this.size >= minConnections) { | ||
return | ||
jacobheun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Sort peers on wether we know protocols of public keys for them | ||
const peers = Array.from(this._libp2p.peerStore.peers.values()) | ||
.sort((a, b) => { | ||
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) { | ||
return 1 | ||
} else if (b.id.pubKey && !a.id.pubKey) { | ||
return 1 | ||
} | ||
return -1 | ||
}) | ||
|
||
for (let i = 0; i < peers.length && this.size < minConnections; i++) { | ||
if (!this.get(peers[i].id)) { | ||
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String()) | ||
try { | ||
await this._libp2p.dialer.connectToPeer(peers[i].id) | ||
|
||
// Connection Manager was stopped | ||
if (!this._started) { | ||
return | ||
} | ||
} catch (err) { | ||
log.error('could not connect to peerStore stored peer', err) | ||
} | ||
} | ||
} | ||
|
||
this._maybeConnectTimeout = setTimeout(this._maybeConnectN, this._options.maybeConnectInterval) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
} | ||
|
||
/** | ||
* If we have more connections than our maximum, close a connection | ||
* to the lowest valued peer. | ||
|
@@ -274,12 +336,12 @@ class ConnectionManager extends EventEmitter { | |
_maybeDisconnectOne () { | ||
if (this._options.minConnections < this.connections.size) { | ||
const peerValues = Array.from(this._peerValues).sort(byPeerValue) | ||
debug('%s: sorted peer values: %j', this._peerId, peerValues) | ||
log('%s: sorted peer values: %j', this._peerId, peerValues) | ||
const disconnectPeer = peerValues[0] | ||
if (disconnectPeer) { | ||
const peerId = disconnectPeer[0] | ||
debug('%s: lowest value peer is %s', this._peerId, peerId) | ||
debug('%s: closing a connection to %j', this._peerId, peerId) | ||
log('%s: lowest value peer is %s', this._peerId, peerId) | ||
log('%s: closing a connection to %j', this._peerId, peerId) | ||
for (const connections of this.connections.values()) { | ||
if (connections[0].remotePeer.toB58String() === peerId) { | ||
connections[0].close() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,9 @@ chai.use(require('chai-as-promised')) | |
const { expect } = chai | ||
const sinon = require('sinon') | ||
|
||
const delay = require('delay') | ||
const pWaitFor = require('p-wait-for') | ||
|
||
const peerUtils = require('../utils/creators/peer') | ||
const mockConnection = require('../utils/mockConnection') | ||
const baseOptions = require('../utils/base-options.browser') | ||
|
@@ -112,4 +115,148 @@ describe('libp2p.connections', () => { | |
await libp2p.stop() | ||
await remoteLibp2p.stop() | ||
}) | ||
|
||
describe('proactive connections', () => { | ||
let nodes = [] | ||
|
||
beforeEach(async () => { | ||
nodes = await peerUtils.createPeer({ | ||
number: 2, | ||
config: { | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
} | ||
} | ||
}) | ||
}) | ||
|
||
afterEach(async () => { | ||
await Promise.all(nodes.map((node) => node.stop())) | ||
sinon.reset() | ||
}) | ||
|
||
it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => { | ||
const [libp2p] = await peerUtils.createPeer({ | ||
fixture: false, | ||
started: false, | ||
config: { | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
}, | ||
connectionManager: { | ||
minConnections: 3 | ||
} | ||
} | ||
}) | ||
|
||
// Populate PeerStore before starting | ||
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) | ||
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) | ||
|
||
await libp2p.start() | ||
|
||
// Wait for peers to connect | ||
await pWaitFor(() => libp2p.connectionManager.size === 2) | ||
|
||
await libp2p.stop() | ||
}) | ||
|
||
it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => { | ||
const minConnections = 1 | ||
const [libp2p] = await peerUtils.createPeer({ | ||
fixture: false, | ||
started: false, | ||
config: { | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
}, | ||
connectionManager: { | ||
minConnections | ||
} | ||
} | ||
}) | ||
|
||
// Populate PeerStore before starting | ||
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) | ||
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) | ||
|
||
await libp2p.start() | ||
|
||
// Wait for peer to connect | ||
await pWaitFor(() => libp2p.connectionManager.size === minConnections) | ||
|
||
// Wait more time to guarantee no other connection happened | ||
await delay(200) | ||
expect(libp2p.connectionManager.size).to.eql(minConnections) | ||
|
||
await libp2p.stop() | ||
}) | ||
|
||
it('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => { | ||
const minConnections = 1 | ||
const [libp2p] = await peerUtils.createPeer({ | ||
fixture: false, | ||
started: false, | ||
config: { | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
}, | ||
connectionManager: { | ||
minConnections | ||
} | ||
} | ||
}) | ||
|
||
// Populate PeerStore before starting | ||
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) | ||
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) | ||
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns']) | ||
|
||
await libp2p.start() | ||
|
||
// Wait for peer to connect | ||
await pWaitFor(() => libp2p.connectionManager.size === minConnections) | ||
|
||
// Should have connected to the peer with protocols | ||
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist() | ||
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist() | ||
|
||
await libp2p.stop() | ||
}) | ||
|
||
it('should connect to peers in the PeerStore when a peer disconnected', async () => { | ||
const minConnections = 1 | ||
const maybeConnectInterval = 1000 | ||
|
||
const [libp2p] = await peerUtils.createPeer({ | ||
fixture: false, | ||
config: { | ||
addresses: { | ||
listen: ['/ip4/127.0.0.1/tcp/0/ws'] | ||
}, | ||
connectionManager: { | ||
minConnections, | ||
maybeConnectInterval | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I debugged it earlier. It was failing sometimes before I add the interval param. I will fix it now |
||
} | ||
} | ||
}) | ||
|
||
// Populate PeerStore after starting (discovery) | ||
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) | ||
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) | ||
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns']) | ||
|
||
// Wait for peer to connect | ||
const conn = await libp2p.dial(nodes[0].peerId) | ||
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.exist() | ||
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.not.exist() | ||
|
||
await conn.close() | ||
await pWaitFor(() => libp2p.connectionManager.size === minConnections) | ||
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist() | ||
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist() | ||
|
||
await libp2p.stop() | ||
}) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can go away