Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: not dial all known peers in parallel on startup #698

Merged
merged 9 commits into from
Jul 14, 2020
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const after = async () => {
}

module.exports = {
bundlesize: { maxSize: '200kB' },
bundlesize: { maxSize: '202kB' },
hooks: {
pre: before,
post: after
Expand Down
75 changes: 59 additions & 16 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const debug = require('debug')
const log = debug('libp2p:connection-manager')
log.error = debug('libp2p:connection-manager:error')

const errcode = require('err-code')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')
Expand Down Expand Up @@ -57,7 +60,7 @@ class ConnectionManager extends EventEmitter {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)
log('options: %j', this._options)

this._libp2p = libp2p

Expand Down Expand Up @@ -101,7 +104,8 @@ class ConnectionManager extends EventEmitter {
})
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
debug('started')
this._maybeConnectN()
log('started')
}

/**
Expand All @@ -113,7 +117,7 @@ class ConnectionManager extends EventEmitter {
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

await this._close()
debug('stopped')
log('stopped')
}

/**
Expand Down Expand Up @@ -157,12 +161,12 @@ class ConnectionManager extends EventEmitter {
_checkMetrics () {
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
this._checkMaxLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxSentData', sent)
this._checkMaxLimit('maxSentData', sent)
const total = received + sent
this._checkLimit('maxData', total)
debug('metrics update', total)
this._checkMaxLimit('maxData', total)
log('metrics update', total)
this._timer.reschedule(this._options.pollInterval)
}

Expand All @@ -188,7 +192,7 @@ class ConnectionManager extends EventEmitter {
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
}

this._checkLimit('maxConnections', this.size)
this._checkMaxLimit('maxConnections', this.size)
}

/**
Expand All @@ -206,6 +210,7 @@ class ConnectionManager extends EventEmitter {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
this._maybeConnectN()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a disconnect occurs and the previous call to _maybeConnectN happens we're going to run two calls in parallel, we should avoid this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean the _maybeConnectN in start?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can potentially have it running from the start call while trying to dial a bunch of peers and one of them meanwhile disconnected. I can add a verification for wether we are currently running it

}
}

Expand Down Expand Up @@ -248,7 +253,7 @@ class ConnectionManager extends EventEmitter {
* @param {*} summary The LatencyMonitor summary
*/
_onLatencyMeasure (summary) {
this._checkLimit('maxEventLoopDelay', summary.avgMs)
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs)
}

/**
Expand All @@ -257,15 +262,53 @@ class ConnectionManager extends EventEmitter {
* @param {string} name The name of the field to check limits for
* @param {number} value The current value of the field
*/
_checkLimit (name, value) {
_checkMaxLimit (name, value) {
const limit = this._options[name]
debug('checking limit of %s. current value: %d of %d', name, value, limit)
log('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
log('%s: limit exceeded: %s, %d', this._peerId, name, value)
this._maybeDisconnectOne()
}
}

/**
* Proactively tries to connect to known peers stored in the PeerStore.
* It will keep the number of connections below the upper limit and sort
* the peers to connect based on wether we know their keys and protocols.
* @async
* @private
*/
async _maybeConnectN () {
const minConnections = this._options.minConnections

// Already has enough connections
if (this.size >= minConnections) {
return
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
}

// Sort peers on wether we know protocols of public keys for them
const peers = Array.from(this._libp2p.peerStore.peers.values())
.sort((a, b) => {
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
return 1
} else if (b.id.pubKey && !a.id.pubKey) {
return 1
}
return -1
})

for (let i = 0; i < peers.length && this.size < minConnections; i++) {
if (!this.get(peers[i].id)) {
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
try {
await this._libp2p.dialer.connectToPeer(peers[i].id)
} catch (err) {
log.error('could not connect to peerStore stored peer', err)
}
}
}
}

/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
Expand All @@ -274,12 +317,12 @@ class ConnectionManager extends EventEmitter {
_maybeDisconnectOne () {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
log('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
log('%s: lowest value peer is %s', this._peerId, peerId)
log('%s: closing a connection to %j', this._peerId, peerId)
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
Expand Down
8 changes: 4 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,19 +459,19 @@ class Libp2p extends EventEmitter {
async _onDidStart () {
this._isStarted = true

this.connectionManager.start()

this.peerStore.on('peer', peerId => {
this.emit('peer:discovery', peerId)
this._maybeConnect(peerId)
})

// Once we start, emit and dial any peers we may have already discovered
// Once we start, emit any peers we may have already discovered
// TODO: this should be removed, as we already discovered these peers in the past
for (const peer of this.peerStore.peers.values()) {
this.emit('peer:discovery', peer.id)
this._maybeConnect(peer.id)
}

this.connectionManager.start()

// Peer discovery
await this._setupPeerDiscovery()
}
Expand Down
144 changes: 144 additions & 0 deletions test/connection-manager/index.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')

const delay = require('delay')
const pWaitFor = require('p-wait-for')

const peerUtils = require('../utils/creators/peer')
const mockConnection = require('../utils/mockConnection')
const baseOptions = require('../utils/base-options.browser')
Expand Down Expand Up @@ -112,4 +115,145 @@ describe('libp2p.connections', () => {
await libp2p.stop()
await remoteLibp2p.stop()
})

describe('proactive connections', () => {
let nodes = []

beforeEach(async () => {
nodes = await peerUtils.createPeer({
number: 2,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
}
}
})
})

afterEach(async () => {
await Promise.all(nodes.map((node) => node.stop()))
sinon.reset()
})

it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => {
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections: 3
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)

await libp2p.start()

// Wait for peers to connect
await pWaitFor(() => libp2p.connectionManager.size === 2)

await libp2p.stop()
})

it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => {
const minConnections = 1
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)

await libp2p.start()

// Wait for peer to connect
await pWaitFor(() => libp2p.connectionManager.size === minConnections)

// Wait more time to guarantee no other connection happened
await delay(200)
expect(libp2p.connectionManager.size).to.eql(minConnections)

await libp2p.stop()
})

it('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => {
const minConnections = 1
const [libp2p] = await peerUtils.createPeer({
fixture: false,
started: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections
}
}
})

// Populate PeerStore before starting
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns'])

await libp2p.start()

// Wait for peer to connect
await pWaitFor(() => libp2p.connectionManager.size === minConnections)

// Should have connected to the peer with protocols
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist()

await libp2p.stop()
})

it('should connect to peers in the PeerStore when a peer disconnected', async () => {
const minConnections = 1
const [libp2p] = await peerUtils.createPeer({
fixture: false,
config: {
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
connectionManager: {
minConnections
}
}
})

// Populate PeerStore after starting (discovery)
libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs)
libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs)
libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns'])

// Wait for peer to connect
const conn = await libp2p.dial(nodes[0].peerId)
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.not.exist()

await conn.close()
await pWaitFor(() => libp2p.connectionManager.size === minConnections)
expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist()
expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist()

await libp2p.stop()
})
})
})
7 changes: 5 additions & 2 deletions test/peer-discovery/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ describe('peer discovery', () => {
sinon.reset()
})

it('should dial know peers on startup', async () => {
it('should dial know peers on startup below the minConnections watermark', async () => {
libp2p = new Libp2p({
...baseOptions,
peerId
peerId,
connectionManager: {
minConnections: 2
}
})

libp2p.peerStore.addressBook.set(remotePeerId, [multiaddr('/ip4/165.1.1.1/tcp/80')])
Expand Down