From 319ddbabb55cd7a20dabf4ba4bfa1a748ccbbfb4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 29 Jul 2024 12:33:48 +0100 Subject: [PATCH 1/4] feat!: remove autodialer The autodialer is a feature from an older time when the DHT was less reliable and we didn't have things like the random walk component. There's not a lot of benefit in opening connections to any old peer, instead protocols now have better ways of targetting the kind of peers they require. Actively dialing peers harms platforms where connections are extremely expensive such as react-native so this feature has been removed. Closes #2621 BREAKING CHANGE: the autodialer has been removed as well as the corresponding config keys --- doc/CONFIGURATION.md | 3 +- doc/LIMITS.md | 6 - interop/test/fixtures/get-libp2p.ts | 3 - packages/integration-tests/.aegir.js | 3 +- .../integration-tests/test/bootstrap.spec.ts | 51 +++ .../test/circuit-relay.node.ts | 3 - packages/integration-tests/test/fetch.spec.ts | 3 - packages/integration-tests/test/interop.ts | 5 +- packages/libp2p/.aegir.js | 3 +- .../src/connection-manager/auto-dial.ts | 285 ----------------- .../connection-manager/constants.browser.ts | 10 - .../connection-manager/constants.defaults.ts | 25 -- .../src/connection-manager/constants.ts | 10 - .../libp2p/src/connection-manager/index.ts | 82 +---- .../test/connection-manager/auto-dial.spec.ts | 293 ------------------ .../test/connection-manager/direct.node.ts | 1 - .../test/connection-manager/direct.spec.ts | 1 - .../test/connection-manager/index.node.ts | 139 --------- .../test/connection-manager/index.spec.ts | 14 +- packages/libp2p/test/registrar/errors.spec.ts | 1 - .../peer-discovery-bootstrap/package.json | 1 + .../peer-discovery-bootstrap/src/index.ts | 18 +- .../test/bootstrap.spec.ts | 42 ++- .../test/compliance.spec.ts | 4 +- packages/transport-webrtc/.aegir.js | 1 - packages/transport-webrtc/test/basics.spec.ts | 3 - .../transport-webtransport/test/browser.ts | 3 - 27 files changed, 117 insertions(+), 896 deletions(-) delete mode 100644 packages/libp2p/src/connection-manager/auto-dial.ts delete mode 100644 packages/libp2p/test/connection-manager/auto-dial.spec.ts diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 2ef97839c1..b5debc6e4d 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -626,8 +626,7 @@ const node = await createLibp2p({ noise() ], connectionManager: { - maxConnections: Infinity, - minConnections: 0 + maxConnections: Infinity } }) ``` diff --git a/doc/LIMITS.md b/doc/LIMITS.md index 5d5efd523e..2187ffabc9 100644 --- a/doc/LIMITS.md +++ b/doc/LIMITS.md @@ -38,12 +38,6 @@ const node = await createLibp2p({ */ maxConnections: 100, - /** - * If the number of open connections goes below this number, the node - * will try to connect to randomly selected peers from the peer store - */ - minConnections: 50, - /** * How many connections can be open but not yet upgraded */ diff --git a/interop/test/fixtures/get-libp2p.ts b/interop/test/fixtures/get-libp2p.ts index f5196366f2..069fc90326 100644 --- a/interop/test/fixtures/get-libp2p.ts +++ b/interop/test/fixtures/get-libp2p.ts @@ -25,9 +25,6 @@ const IP = process.env.ip ?? '0.0.0.0' export async function getLibp2p (): Promise> { const options: Libp2pOptions<{ ping: PingService, identify: Identify }> = { start: true, - connectionManager: { - minConnections: 0 - }, connectionGater: { denyDialMultiaddr: async () => false }, diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 078dd47d7d..22a2d0a630 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -24,8 +24,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/integration-tests/test/bootstrap.spec.ts b/packages/integration-tests/test/bootstrap.spec.ts index 3334ccf8ac..e9f80a169a 100644 --- a/packages/integration-tests/test/bootstrap.spec.ts +++ b/packages/integration-tests/test/bootstrap.spec.ts @@ -2,8 +2,11 @@ import { bootstrap } from '@libp2p/bootstrap' import { TypedEventEmitter, peerDiscoverySymbol } from '@libp2p/interface' +import { mplex } from '@libp2p/mplex' import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { plaintext } from '@libp2p/plaintext' import { webSockets } from '@libp2p/websockets' +import * as Filter from '@libp2p/websockets/filters' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { createLibp2p } from 'libp2p' @@ -103,4 +106,52 @@ describe('bootstrap', () => { return deferred.promise }) + + it('bootstrap should dial all peers in the list', async () => { + const deferred = defer() + + const bootstrappers = [ + `${process.env.RELAY_MULTIADDR}` + ] + + libp2p = await createLibp2p({ + connectionEncryption: [ + plaintext() + ], + transports: [ + webSockets({ + filter: Filter.all + }) + ], + streamMuxers: [ + mplex() + ], + peerDiscovery: [ + bootstrap({ + list: bootstrappers + }) + ], + connectionGater: { + denyDialMultiaddr: () => false + } + }) + + const expectedPeers = new Set( + bootstrappers.map(ma => multiaddr(ma).getPeerId()) + ) + + libp2p.addEventListener('connection:open', (evt) => { + const { remotePeer } = evt.detail + + expectedPeers.delete(remotePeer.toString()) + if (expectedPeers.size === 0) { + libp2p.removeEventListener('connection:open') + deferred.resolve() + } + }) + + await libp2p.start() + + return deferred.promise + }) }) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index bdfcf6fba1..120daaf1f7 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -42,9 +42,6 @@ async function createClient (options: Libp2pOptions = {}): Promise { connectionEncryption: [ plaintext() ], - connectionManager: { - minConnections: 0 - }, services: { identify: identify() }, diff --git a/packages/integration-tests/test/fetch.spec.ts b/packages/integration-tests/test/fetch.spec.ts index f4fc888223..7d2885e027 100644 --- a/packages/integration-tests/test/fetch.spec.ts +++ b/packages/integration-tests/test/fetch.spec.ts @@ -12,9 +12,6 @@ async function createNode (): Promise> { return createLibp2p(createBaseOptions({ services: { fetch: fetch() - }, - connectionManager: { - minConnections: 0 } })) } diff --git a/packages/integration-tests/test/interop.ts b/packages/integration-tests/test/interop.ts index 01f0a4a4c9..8f2323674e 100644 --- a/packages/integration-tests/test/interop.ts +++ b/packages/integration-tests/test/interop.ts @@ -134,10 +134,7 @@ async function createJsPeer (options: SpawnOptions): Promise { }, transports: [tcp(), circuitRelayTransport()], streamMuxers: [], - connectionEncryption: [noise()], - connectionManager: { - minConnections: 0 - } + connectionEncryption: [noise()] } if (options.noListen !== true) { diff --git a/packages/libp2p/.aegir.js b/packages/libp2p/.aegir.js index a89fba7ad6..a7250fb938 100644 --- a/packages/libp2p/.aegir.js +++ b/packages/libp2p/.aegir.js @@ -20,8 +20,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/libp2p/src/connection-manager/auto-dial.ts b/packages/libp2p/src/connection-manager/auto-dial.ts deleted file mode 100644 index 16fada9221..0000000000 --- a/packages/libp2p/src/connection-manager/auto-dial.ts +++ /dev/null @@ -1,285 +0,0 @@ -import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { PeerQueue } from '@libp2p/utils/peer-queue' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js' -import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface' -import type { ConnectionManager } from '@libp2p/interface-internal' - -interface AutoDialInit { - minConnections?: number - maxQueueLength?: number - autoDialConcurrency?: number - autoDialPriority?: number - autoDialInterval?: number - autoDialPeerRetryThreshold?: number - autoDialDiscoveredPeersDebounce?: number -} - -interface AutoDialComponents { - connectionManager: ConnectionManager - peerStore: PeerStore - events: TypedEventTarget - logger: ComponentLogger - metrics?: Metrics -} - -const defaultOptions = { - minConnections: MIN_CONNECTIONS, - maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialInterval: AUTO_DIAL_INTERVAL, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE -} - -export class AutoDial implements Startable { - private readonly connectionManager: ConnectionManager - private readonly peerStore: PeerStore - private readonly queue: PeerQueue - private readonly minConnections: number - private readonly autoDialPriority: number - private readonly autoDialIntervalMs: number - private readonly autoDialMaxQueueLength: number - private readonly autoDialPeerRetryThresholdMs: number - private readonly autoDialDiscoveredPeersDebounce: number - private autoDialInterval?: ReturnType - private started: boolean - private running: boolean - private readonly log: Logger - - /** - * Proactively tries to connect to known peers stored in the PeerStore. - * It will keep the number of connections below the upper limit and sort - * the peers to connect based on whether we know their keys and protocols. - */ - constructor (components: AutoDialComponents, init: AutoDialInit) { - this.connectionManager = components.connectionManager - this.peerStore = components.peerStore - this.minConnections = init.minConnections ?? defaultOptions.minConnections - this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority - this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval - this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength - this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold - this.autoDialDiscoveredPeersDebounce = init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce - this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial') - this.started = false - this.running = false - this.queue = new PeerQueue({ - concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - metricName: 'libp2p_autodial_queue', - metrics: components.metrics - }) - this.queue.addEventListener('error', (evt) => { - this.log.error('error during auto-dial', evt.detail) - }) - - // check the min connection limit whenever a peer disconnects - components.events.addEventListener('connection:close', () => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }) - - // sometimes peers are discovered in quick succession so add a small - // debounce to ensure all eligible peers are autodialed - let debounce: ReturnType - - // when new peers are discovered, dial them if we don't have - // enough connections - components.events.addEventListener('peer:discovery', () => { - clearTimeout(debounce) - debounce = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }, this.autoDialDiscoveredPeersDebounce) - }) - } - - isStarted (): boolean { - return this.started - } - - start (): void { - this.started = true - } - - afterStart (): void { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - } - - stop (): void { - // clear the queue - this.queue.clear() - clearTimeout(this.autoDialInterval) - this.started = false - this.running = false - } - - async autoDial (): Promise { - if (!this.started || this.running) { - return - } - - const connections = this.connectionManager.getConnectionsMap() - const numConnections = connections.size - - // already have enough connections - if (numConnections >= this.minConnections) { - if (this.minConnections > 0) { - this.log.trace('have enough connections %d/%d', numConnections, this.minConnections) - } - - // no need to schedule next autodial as it will be run when on - // connection:close event - return - } - - if (this.queue.size > this.autoDialMaxQueueLength) { - this.log('not enough connections %d/%d but auto dial queue is full', numConnections, this.minConnections) - this.sheduleNextAutodial() - return - } - - this.running = true - - this.log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections) - - const dialQueue = new PeerSet( - // @ts-expect-error boolean filter removes falsy peer IDs - this.connectionManager.getDialQueue() - .map(queue => queue.peerId) - .filter(Boolean) - ) - - // sort peers on whether we know protocols or public keys for them - const peers = await this.peerStore.all({ - filters: [ - // remove some peers - (peer) => { - // remove peers without addresses - if (peer.addresses.length === 0) { - this.log.trace('not autodialing %p because they have no addresses', peer.id) - return false - } - - // remove peers we are already connected to - if (connections.has(peer.id)) { - this.log.trace('not autodialing %p because they are already connected', peer.id) - return false - } - - // remove peers we are already dialling - if (dialQueue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being dialed', peer.id) - return false - } - - // remove peers already in the autodial queue - if (this.queue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being autodialed', peer.id) - return false - } - - return true - } - ] - }) - - // shuffle the peers - this is so peers with the same tag values will be - // dialled in a different order each time - const shuffledPeers = peers.sort(() => Math.random() > 0.5 ? 1 : -1) - - // sort shuffled peers by tag value - const peerValues = new PeerMap() - for (const peer of shuffledPeers) { - if (peerValues.has(peer.id)) { - continue - } - - // sum all tag values - peerValues.set(peer.id, [...peer.tags.values()].reduce((acc, curr) => { - return acc + curr.value - }, 0)) - } - - // sort by value, highest to lowest - const sortedPeers = shuffledPeers.sort((a, b) => { - const peerAValue = peerValues.get(a.id) ?? 0 - const peerBValue = peerValues.get(b.id) ?? 0 - - if (peerAValue > peerBValue) { - return -1 - } - - if (peerAValue < peerBValue) { - return 1 - } - - return 0 - }) - - const peersThatHaveNotFailed = sortedPeers.filter(peer => { - const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY) - - if (lastDialFailure == null) { - return true - } - - const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure)) - - if (isNaN(lastDialFailureTimestamp)) { - return true - } - - // only dial if the time since the last failure is above the retry threshold - return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs - }) - - this.log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length) - - for (const peer of peersThatHaveNotFailed) { - this.queue.add(async () => { - const numConnections = this.connectionManager.getConnectionsMap().size - - // Check to see if we still need to auto dial - if (numConnections >= this.minConnections) { - this.log('got enough connections now %d/%d', numConnections, this.minConnections) - this.queue.clear() - return - } - - this.log('connecting to a peerStore stored peer %p', peer.id) - await this.connectionManager.openConnection(peer.id, { - priority: this.autoDialPriority - }) - }, { - peerId: peer.id - }).catch(err => { - this.log.error('could not connect to peerStore stored peer', err) - }) - } - - this.running = false - this.sheduleNextAutodial() - } - - private sheduleNextAutodial (): void { - if (!this.started) { - return - } - - this.autoDialInterval = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - }, this.autoDialIntervalMs) - } -} diff --git a/packages/libp2p/src/connection-manager/constants.browser.ts b/packages/libp2p/src/connection-manager/constants.browser.ts index 2c369c1245..cdafdbcdb2 100644 --- a/packages/libp2p/src/connection-manager/constants.browser.ts +++ b/packages/libp2p/src/connection-manager/constants.browser.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 5 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 100 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 50 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 * 7 diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index d9a6f6e3b6..87b0c7127f 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -13,31 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 2e3 */ export const MAX_PEER_ADDRS_TO_DIAL = 25 -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval - */ -export const AUTO_DIAL_INTERVAL = 5000 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialConcurrency - */ -export const AUTO_DIAL_CONCURRENCY = 25 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialPriority - */ -export const AUTO_DIAL_PRIORITY = 0 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialMaxQueueLength - */ -export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialDiscoveredPeersDebounce - */ -export const AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE = 10 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold */ diff --git a/packages/libp2p/src/connection-manager/constants.ts b/packages/libp2p/src/connection-manager/constants.ts index a6a6c486f4..422074f57c 100644 --- a/packages/libp2p/src/connection-manager/constants.ts +++ b/packages/libp2p/src/connection-manager/constants.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 50 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 300 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index d8c441fc9d..3fab4b62b4 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -7,9 +7,8 @@ import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { CustomProgressEvent } from 'progress-events' import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' -import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' import { DialQueue } from './dial-queue.js' import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' @@ -24,52 +23,6 @@ export interface ConnectionManagerInit { */ maxConnections?: number - /** - * The minimum number of connections below which libp2p will start to dial peers - * from the peer book. Setting this to 0 effectively disables this behaviour. - * (default: 50, 5 in browsers) - */ - minConnections?: number - - /** - * How long to wait between attempting to keep our number of concurrent connections - * above minConnections (default: 5000) - */ - autoDialInterval?: number - - /** - * When dialling peers from the peer book to keep the number of open connections - * above `minConnections`, add dials for this many peers to the dial queue - * at once. (default: 25) - */ - autoDialConcurrency?: number - - /** - * To allow user dials to take priority over auto dials, use this value as the - * dial priority. (default: 0) - */ - autoDialPriority?: number - - /** - * Limit the maximum number of peers to dial when trying to keep the number of - * open connections above `minConnections`. (default: 100) - */ - autoDialMaxQueueLength?: number - - /** - * When we've failed to dial a peer, do not autodial them again within this - * number of ms. (default: 1 minute, 7 minutes in browsers) - */ - autoDialPeerRetryThreshold?: number - - /** - * Newly discovered peers may be auto-dialed to increase the number of open - * connections, but they can be discovered in quick succession so add a small - * delay before attempting to dial them in case more peers have been - * discovered. (default: 10ms) - */ - autoDialDiscoveredPeersDebounce?: number - /** * Sort the known addresses of a peer before trying to dial, By default public * addresses will be dialled before private (e.g. loopback or LAN) addresses. @@ -142,15 +95,9 @@ export interface ConnectionManagerInit { } const defaultOptions = { - minConnections: MIN_CONNECTIONS, maxConnections: MAX_CONNECTIONS, inboundConnectionThreshold: INBOUND_CONNECTION_THRESHOLD, - maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialMaxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE + maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS } export interface DefaultConnectionManagerComponents { @@ -177,7 +124,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly maxConnections: number public readonly dialQueue: DialQueue - public readonly autoDial: AutoDial public readonly connectionPruner: ConnectionPruner private readonly inboundConnectionRateLimiter: RateLimiter private readonly peerStore: PeerStore @@ -187,10 +133,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) { this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections - const minConnections = init.minConnections ?? defaultOptions.minConnections - if (this.maxConnections < minConnections) { - throw new CodeError('Connection Manager maxConnections must be greater than minConnections', codes.ERR_INVALID_PARAMETERS) + if (this.maxConnections < 0) { + throw new CodeError('Connection Manager maxConnections must be greater than zero', codes.ERR_INVALID_PARAMETERS) } /** @@ -222,21 +167,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { duration: 1 }) - // controls what happens when we don't have enough connections - this.autoDial = new AutoDial({ - connectionManager: this, - peerStore: components.peerStore, - events: components.events, - logger: components.logger - }, { - minConnections, - autoDialConcurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority, - autoDialPeerRetryThreshold: init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold, - autoDialDiscoveredPeersDebounce: init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce, - maxQueueLength: init.autoDialMaxQueueLength ?? defaultOptions.autoDialMaxQueueLength - }) - // controls what happens when we have too many connections this.connectionPruner = new ConnectionPruner({ connectionManager: this, @@ -351,7 +281,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { }) this.dialQueue.start() - this.autoDial.start() this.started = true this.log('started') @@ -379,8 +308,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { .catch(err => { this.log.error(err) }) - - this.autoDial.afterStart() } /** @@ -388,7 +315,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { */ async stop (): Promise { this.dialQueue.stop() - this.autoDial.stop() // Close all connections we're tracking const tasks: Array> = [] diff --git a/packages/libp2p/test/connection-manager/auto-dial.spec.ts b/packages/libp2p/test/connection-manager/auto-dial.spec.ts deleted file mode 100644 index 2d7bdcec03..0000000000 --- a/packages/libp2p/test/connection-manager/auto-dial.spec.ts +++ /dev/null @@ -1,293 +0,0 @@ -/* eslint-env mocha */ - -import { TypedEventEmitter, type TypedEventTarget, type Libp2pEvents, type Connection, type PeerId, type PeerStore, type Peer } from '@libp2p/interface' -import { matchPeerId } from '@libp2p/interface-compliance-tests/matchers' -import { defaultLogger } from '@libp2p/logger' -import { PeerMap } from '@libp2p/peer-collections' -import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { PersistentPeerStore } from '@libp2p/peer-store' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { MemoryDatastore } from 'datastore-core' -import delay from 'delay' -import pWaitFor from 'p-wait-for' -import Sinon from 'sinon' -import { stubInterface } from 'sinon-ts' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { defaultComponents } from '../../src/components.js' -import { AutoDial } from '../../src/connection-manager/auto-dial.js' -import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' -import type { ConnectionManager } from '@libp2p/interface-internal' - -describe('auto-dial', () => { - let autoDialer: AutoDial - let events: TypedEventTarget - let peerStore: PeerStore - let peerId: PeerId - - beforeEach(async () => { - peerId = await createEd25519PeerId() - events = new TypedEventEmitter() - peerStore = new PersistentPeerStore({ - datastore: new MemoryDatastore(), - events, - peerId, - logger: defaultLogger() - }) - }) - - afterEach(() => { - if (autoDialer != null) { - autoDialer.stop() - } - }) - - it('should not dial peers without multiaddrs', async () => { - // peers with protocols are dialled before peers without protocols - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [ - '/foo/bar' - ], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerWithoutAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(peerWithoutAddress.id, peerWithoutAddress) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithoutAddress.id))).to.be.false() - }) - - it('should not dial connected peers', async () => { - const connectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const unConnectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(connectedPeer.id, connectedPeer) - await peerStore.save(unConnectedPeer.id, unConnectedPeer) - - const connectionMap = new PeerMap() - connectionMap.set(connectedPeer.id, [stubInterface()]) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(connectionMap), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(unConnectedPeer.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(connectedPeer.id))).to.be.false() - }) - - it('should not dial peers already in the dial queue', async () => { - // peers with protocols are dialled before peers without protocols - const peerInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerNotInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerInDialQueue.id, peerInDialQueue) - await peerStore.save(peerNotInDialQueue.id, peerNotInDialQueue) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([{ - id: 'foo', - peerId: peerInDialQueue.id, - multiaddrs: [], - status: 'queued' - }]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerNotInDialQueue.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerInDialQueue.id))).to.be.false() - }) - - it('should not start parallel autodials', async () => { - const peerStoreAllSpy = Sinon.spy(peerStore, 'all') - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - - // call autodial twice - await Promise.all([ - autoDialer.autoDial(), - autoDialer.autoDial() - ]) - - // should only have queried peer store once - expect(peerStoreAllSpy.callCount).to.equal(1) - }) - - it('should not re-dial peers we have recently failed to dial', async () => { - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const undialablePeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - // we failed to dial them recently - metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(undialablePeer.id, undialablePeer) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialPeerRetryThreshold: 2000 - }) - autoDialer.start() - - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false() - - // pass the retry threshold - await delay(2000) - - // autodial again - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 3 - }) - - // should have retried the unreachable peer - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true() - }) -}) diff --git a/packages/libp2p/test/connection-manager/direct.node.ts b/packages/libp2p/test/connection-manager/direct.node.ts index 271f85a594..234a71b445 100644 --- a/packages/libp2p/test/connection-manager/direct.node.ts +++ b/packages/libp2p/test/connection-manager/direct.node.ts @@ -91,7 +91,6 @@ describe('dialing (direct, TCP)', () => { localComponents.peerStore = new PersistentPeerStore(localComponents) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) localComponents.addressManager = new DefaultAddressManager(localComponents) diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index ef2f3941f4..f5c8d8071f 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -56,7 +56,6 @@ describe('dialing (direct, WebSockets)', () => { }) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) diff --git a/packages/libp2p/test/connection-manager/index.node.ts b/packages/libp2p/test/connection-manager/index.node.ts index cebad403d3..35cf5ba180 100644 --- a/packages/libp2p/test/connection-manager/index.node.ts +++ b/packages/libp2p/test/connection-manager/index.node.ts @@ -6,10 +6,8 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { dns } from '@multiformats/dns' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import delay from 'delay' import all from 'it-all' import { pipe } from 'it-pipe' -import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { defaultComponents } from '../../src/components.js' @@ -60,7 +58,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -98,7 +95,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -199,141 +195,6 @@ describe('libp2p.connections', () => { sinon.reset() }) - it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => { - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections: 3 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peers to connect - await pWaitFor(() => libp2p.getConnections().length === 2) - - await libp2p.stop() - }) - - it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Wait more time to guarantee no other connection happened - await delay(200) - expect(libp2p.components.connectionManager.getConnections().length).to.eql(minConnections) - - await libp2p.stop() - }) - - // flaky - it.skip('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs(), - protocols: ['/protocol-min-conns'] - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Should have connected to the peer with protocols - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.be.empty() - expect(libp2p.components.connectionManager.getConnections(nodes[1].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - - it('should connect to peers in the PeerStore when a peer disconnected', async () => { - const minConnections = 1 - - libp2p = await createNode({ - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections - } - } - }) - - // Populate PeerStore after starting (discovery) - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - - // Wait for peer to connect - const conn = await libp2p.dial(nodes[0].peerId) - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await conn.close() - // Closed - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 0) - // Connected - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 1) - - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - it('should be closed status once immediately stopping', async () => { libp2p = await createNode({ config: createBaseOptions({ diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index 5dce1b63e3..145bee3bd8 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -19,7 +19,6 @@ import type { TransportManager } from '@libp2p/interface-internal' const defaultOptions = { maxConnections: 10, - minConnections: 1, autoDialInterval: Infinity, inboundUpgradeTimeout: 10000 } @@ -85,8 +84,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -144,8 +142,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -210,7 +207,6 @@ describe('Connection Manager', () => { config: createBaseOptions({ connectionManager: { maxConnections: max, - minConnections: 0, allow: [ '/ip4/83.13.55.32' ] @@ -294,8 +290,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 0 + maxConnections: max } }), started: false @@ -327,8 +322,7 @@ describe('Connection Manager', () => { await expect(createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: 5, - minConnections: 6 + maxConnections: -1 } }), started: false diff --git a/packages/libp2p/test/registrar/errors.spec.ts b/packages/libp2p/test/registrar/errors.spec.ts index ff78d2097d..4ed601ab81 100644 --- a/packages/libp2p/test/registrar/errors.spec.ts +++ b/packages/libp2p/test/registrar/errors.spec.ts @@ -31,7 +31,6 @@ describe('registrar errors', () => { }) components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { - minConnections: 50, maxConnections: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/packages/peer-discovery-bootstrap/package.json b/packages/peer-discovery-bootstrap/package.json index e8fd393f99..474f0f4a7d 100644 --- a/packages/peer-discovery-bootstrap/package.json +++ b/packages/peer-discovery-bootstrap/package.json @@ -55,6 +55,7 @@ }, "dependencies": { "@libp2p/interface": "^1.6.2", + "@libp2p/interface-internal": "^1.3.1", "@libp2p/peer-id": "^4.2.2", "@multiformats/mafmt": "^12.1.6", "@multiformats/multiaddr": "^12.2.3" diff --git a/packages/peer-discovery-bootstrap/src/index.ts b/packages/peer-discovery-bootstrap/src/index.ts index eb09310505..140a2078cd 100644 --- a/packages/peer-discovery-bootstrap/src/index.ts +++ b/packages/peer-discovery-bootstrap/src/index.ts @@ -37,10 +37,10 @@ import { peerIdFromString } from '@libp2p/peer-id' import { P2P } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import type { ComponentLogger, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerInfo, PeerStore, Startable } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' const DEFAULT_BOOTSTRAP_TAG_NAME = 'bootstrap' const DEFAULT_BOOTSTRAP_TAG_VALUE = 50 -const DEFAULT_BOOTSTRAP_TAG_TTL = 120000 const DEFAULT_BOOTSTRAP_DISCOVERY_TIMEOUT = 1000 export interface BootstrapInit { @@ -55,7 +55,9 @@ export interface BootstrapInit { timeout?: number /** - * Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap') + * Tag a bootstrap peer with this name before "discovering" it + * + * @default 'bootstrap' */ tagName?: string @@ -65,7 +67,7 @@ export interface BootstrapInit { tagValue?: number /** - * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) + * Cause the bootstrap peer tag to be removed after this number of ms */ tagTTL?: number } @@ -73,6 +75,7 @@ export interface BootstrapInit { export interface BootstrapComponents { peerStore: PeerStore logger: ComponentLogger + connectionManager: ConnectionManager } /** @@ -166,9 +169,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi tags: { [this._init.tagName ?? DEFAULT_BOOTSTRAP_TAG_NAME]: { value: this._init.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, - ttl: this._init.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL + ttl: this._init.tagTTL } - } + }, + multiaddrs: peerData.multiaddrs }) // check we are still running @@ -177,6 +181,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi } this.safeDispatchEvent('peer', { detail: peerData }) + this.components.connectionManager.openConnection(peerData.id) + .catch(err => { + this.log.error('could not dial bootstrap peer %p', peerData.id, err) + }) } } diff --git a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts index 8549f71012..7429053fb4 100644 --- a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts +++ b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts @@ -7,13 +7,20 @@ import { IPFS } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { bootstrap, type BootstrapComponents } from '../src/index.js' +import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import partialValidPeerList from './fixtures/some-invalid-peers.js' -import type { PeerStore } from '@libp2p/interface' +import type { ComponentLogger, PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface StubbedBootstrapComponents { + peerStore: PeerStore + logger: ComponentLogger + connectionManager: StubbedInstance +} describe('bootstrap', () => { - let components: BootstrapComponents + let components: StubbedBootstrapComponents let peerStore: StubbedInstance beforeEach(async () => { @@ -21,7 +28,8 @@ describe('bootstrap', () => { components = { peerStore, - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } }) @@ -49,6 +57,27 @@ describe('bootstrap', () => { await stop(r) }) + it('should dial bootstrap peers', async function () { + this.timeout(5 * 1000) + const r = bootstrap({ + list: peerList, + timeout: 100 + })(components) + + await start(r) + + await new Promise(resolve => { + const interval = setInterval(() => { + if (components.connectionManager.openConnection.callCount === 1) { + clearInterval(interval) + resolve() + } + }, 100) + }) + + await stop(r) + }) + it('should tag bootstrap peers', async function () { this.timeout(5 * 1000) @@ -92,7 +121,10 @@ describe('bootstrap', () => { value: tagValue, ttl: tagTTL } - } + }, + multiaddrs: [ + bootstrapper0ma + ] }) await stop(r) diff --git a/packages/peer-discovery-bootstrap/test/compliance.spec.ts b/packages/peer-discovery-bootstrap/test/compliance.spec.ts index 9a5a46cb95..78c80419df 100644 --- a/packages/peer-discovery-bootstrap/test/compliance.spec.ts +++ b/packages/peer-discovery-bootstrap/test/compliance.spec.ts @@ -6,13 +6,15 @@ import { stubInterface } from 'sinon-ts' import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import type { PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' describe('compliance tests', () => { tests({ async setup () { const components = { peerStore: stubInterface(), - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } return bootstrap({ diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index e95fcc9e5e..db2ccf46cb 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -35,7 +35,6 @@ export default { }) }, connectionManager: { - minConnections: 0, inboundConnectionThreshold: Infinity } }) diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index c4d01470f5..43e3873fa9 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -44,9 +44,6 @@ async function createNode (): Promise { connectionGater: { denyDialMultiaddr: () => false }, - connectionManager: { - minConnections: 0 - }, services: { identify: identify() } diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 7da82aee05..0a5167fee2 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -21,9 +21,6 @@ describe('libp2p-webtransport', () => { connectionGater: { denyDialMultiaddr: async () => false }, - connectionManager: { - minConnections: 0 - }, services: { ping: ping() } From eae36cb1fae29c2f58affa15c1c29998bd4198de Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 14 Aug 2024 17:00:08 +0100 Subject: [PATCH 2/4] chore: apply suggestions from code review Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> --- packages/libp2p/test/connection-manager/index.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index c09d0b8a9e..9b52ee8a74 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -19,7 +19,6 @@ import type { TransportManager } from '@libp2p/interface-internal' const defaultOptions = { maxConnections: 10, - autoDialInterval: Infinity, inboundUpgradeTimeout: 10000 } From e5b7f7859deb5cc2f969d8dffa74f7939ce319ac Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 14 Aug 2024 18:17:57 +0100 Subject: [PATCH 3/4] fix: reconnect to peers tagged with KEEP_ALIVE --- packages/integration-tests/test/peers.spec.ts | 59 ++++++++ packages/libp2p/package.json | 1 + .../connection-manager/constants.defaults.ts | 11 +- .../src/connection-manager/dial-queue.ts | 19 ++- .../libp2p/src/connection-manager/index.ts | 76 +++++++---- .../src/connection-manager/reconnect-queue.ts | 126 ++++++++++++++++++ .../reconnect-queue.spec.ts | 117 ++++++++++++++++ 7 files changed, 377 insertions(+), 32 deletions(-) create mode 100644 packages/integration-tests/test/peers.spec.ts create mode 100644 packages/libp2p/src/connection-manager/reconnect-queue.ts create mode 100644 packages/libp2p/test/connection-manager/reconnect-queue.spec.ts diff --git a/packages/integration-tests/test/peers.spec.ts b/packages/integration-tests/test/peers.spec.ts new file mode 100644 index 0000000000..2992de118d --- /dev/null +++ b/packages/integration-tests/test/peers.spec.ts @@ -0,0 +1,59 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, type Libp2p } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import { createLibp2p } from 'libp2p' +import pWaitFor from 'p-wait-for' +import { createBaseOptions } from './fixtures/base-options.js' + +describe('peers', () => { + let nodes: Libp2p[] + + beforeEach(async () => { + nodes = await Promise.all([ + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()) + ]) + }) + + afterEach(async () => Promise.all(nodes.map(async n => { await n.stop() }))) + + it('should redial a peer tagged with KEEP_ALIVE', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + await nodes[0].peerStore.merge(nodes[1].peerId, { + tags: { + [KEEP_ALIVE]: { + value: 1 + } + } + }) + + await Promise.all( + nodes[0].getConnections(nodes[1].peerId).map(async conn => conn.close()) + ) + + await pWaitFor(async () => { + return nodes[0].getConnections(nodes[1].peerId).length > 0 + }, { + interval: 100, + timeout: { + milliseconds: 5000, + message: 'Did not reconnect to peer tagged with KEEP_ALIVE' + } + }) + }) + + it('should store the multiaddr for a peer after a successful dial', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + const peer = await nodes[0].peerStore.get(nodes[1].peerId) + expect(peer.addresses).to.not.be.empty() + expect(peer.metadata.get('last-dial-success')).to.be.ok() + }) +}) diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index a288037a34..708c906a0a 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -107,6 +107,7 @@ "merge-options": "^3.0.4", "multiformats": "^13.1.0", "p-defer": "^4.0.1", + "p-retry": "^6.2.0", "progress-events": "^1.0.0", "race-event": "^1.3.0", "race-signal": "^1.0.2", diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 87b0c7127f..632fff7253 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -25,14 +25,21 @@ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 /** * Store as part of the peer store metadata for a given peer, the value for this - * key is a timestamp of the last time a dial attempted failed with the relevant - * peer stored as a string. + * key is a timestamp of the last time a dial attempt failed with the timestamp + * stored as a string. * * Used to insure we do not endlessly try to auto dial peers we have recently * failed to dial. */ export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure' +/** + * Store as part of the peer store metadata for a given peer, the value for this + * key is a timestamp of the last time a dial attempt succeeded with the + * timestamp stored as a string. + */ +export const LAST_DIAL_SUCCESS_KEY = 'last-dial-success' + /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength */ diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 9d927e4c51..d88b04d254 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -16,7 +16,8 @@ import { MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, LAST_DIAL_FAILURE_KEY, - MAX_DIAL_QUEUE_LENGTH + MAX_DIAL_QUEUE_LENGTH, + LAST_DIAL_SUCCESS_KEY } from './constants.js' import { resolveMultiaddrs } from './utils.js' import { DEFAULT_DIAL_PRIORITY } from './index.js' @@ -244,6 +245,20 @@ export class DialQueue { this.log('dial to %a succeeded', address.multiaddr) + // record the successful dial and the address + try { + await this.components.peerStore.merge(conn.remotePeer, { + multiaddrs: [ + conn.remoteAddr + ], + metadata: { + [LAST_DIAL_SUCCESS_KEY]: uint8ArrayFromString(Date.now().toString()) + } + }) + } catch (err: any) { + this.log.error('could not update last dial failure key for %p', peerId, err) + } + return conn } catch (err: any) { this.log.error('dial failed to %a', address.multiaddr, err) @@ -251,7 +266,7 @@ export class DialQueue { if (peerId != null) { // record the failed dial try { - await this.components.peerStore.patch(peerId, { + await this.components.peerStore.merge(peerId, { metadata: { [LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString()) } diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 86401b3cd7..ac1eaa9b60 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -1,4 +1,4 @@ -import { InvalidParametersError, KEEP_ALIVE, NotStartedError } from '@libp2p/interface' +import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { RateLimiter } from '@libp2p/utils/rate-limiter' @@ -9,7 +9,8 @@ import { getPeerAddress } from '../get-peer.js' import { ConnectionPruner } from './connection-pruner.js' import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' import { DialQueue } from './dial-queue.js' -import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' +import { ReconnectQueue } from './reconnect-queue.js' +import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' import type { JobStatus } from '@libp2p/utils/queue' @@ -91,6 +92,31 @@ export interface ConnectionManagerInit { * (default: 10) */ maxIncomingPendingConnections?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, attempt to redial them + * this many times. + * + * @default 5 + */ + reconnectRetries?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, wait this long between + * each retry. Note this will be multiplied by `reconnectFactor` to create an + * increasing retry backoff. + * + * @default 1000 + */ + reconnectInterval?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, apply this multiplication + * factor to the time interval between each retry. + * + * @default 2 + */ + reconnectFactor?: number } const defaultOptions = { @@ -123,6 +149,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly maxConnections: number public readonly dialQueue: DialQueue + public readonly reconnectQueue: ReconnectQueue public readonly connectionPruner: ConnectionPruner private readonly inboundConnectionRateLimiter: RateLimiter private readonly peerStore: PeerStore @@ -188,6 +215,17 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { }, connections: this.connections }) + + this.reconnectQueue = new ReconnectQueue({ + events: components.events, + peerStore: components.peerStore, + logger: components.logger, + connectionManager: this + }, { + retries: init.reconnectRetries, + interval: init.reconnectInterval, + factor: init.reconnectFactor + }) } readonly [Symbol.toStringTag] = '@libp2p/connection-manager' @@ -279,41 +317,23 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { } }) - this.dialQueue.start() + await start( + this.dialQueue, + this.reconnectQueue + ) this.started = true this.log('started') } - async afterStart (): Promise { - // re-connect to any peers with the KEEP_ALIVE tag - void Promise.resolve() - .then(async () => { - const keepAlivePeers: Peer[] = await this.peerStore.all({ - filters: [(peer) => { - return peer.tags.has(KEEP_ALIVE) - }] - }) - - await Promise.all( - keepAlivePeers.map(async peer => { - await this.openConnection(peer.id) - .catch(err => { - this.log.error(err) - }) - }) - ) - }) - .catch(err => { - this.log.error(err) - }) - } - /** * Stops the Connection Manager */ async stop (): Promise { - this.dialQueue.stop() + await stop( + this.reconnectQueue, + this.dialQueue + ) // Close all connections we're tracking const tasks: Array> = [] diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts new file mode 100644 index 0000000000..2b56d2d8cc --- /dev/null +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -0,0 +1,126 @@ +import { KEEP_ALIVE } from '@libp2p/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import pRetry from 'p-retry' +import type { ComponentLogger, Libp2pEvents, Logger, Peer, PeerId, PeerStore, Startable, TypedEventTarget } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface ReconnectQueueComponents { + connectionManager: ConnectionManager + events: TypedEventTarget + peerStore: PeerStore + logger: ComponentLogger +} + +export interface ReconnectQueueInit { + retries?: number + interval?: number + factor?: number +} + +/** + * When peers tagged with `KEEP_ALIVE` disconnect, this component attempts to + * redial them + */ +export class ReconnectQueue implements Startable { + private readonly log: Logger + private readonly queue: PeerQueue + private started: boolean + private readonly peerStore: PeerStore + private readonly retries: number + private readonly interval?: number + private readonly factor?: number + private readonly connectionManager: ConnectionManager + + constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { + this.log = components.logger.forComponent('libp2p:reconnect-queue') + this.peerStore = components.peerStore + this.connectionManager = components.connectionManager + this.queue = new PeerQueue() + this.started = false + this.retries = init.retries ?? 5 + this.factor = init.factor + + components.events.addEventListener('peer:disconnect', (evt) => { + this.maybeReconnect(evt.detail) + .catch(err => { + this.log.error('failed to maybe reconnect to %p', evt.detail, err) + }) + }) + } + + private async maybeReconnect (peerId: PeerId): Promise { + if (!this.started) { + return + } + + const peer = await this.peerStore.get(peerId) + + if (!peer.tags.has(KEEP_ALIVE)) { + return + } + + if (this.queue.has(peerId)) { + return + } + + this.queue.add(async (options) => { + await pRetry(async (attempt) => { + if (!this.started) { + return + } + + try { + await this.connectionManager.openConnection(peerId, { + signal: options?.signal + }) + } catch (err) { + this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err) + throw err + } + }, { + signal: options?.signal, + retries: this.retries, + factor: this.factor, + minTimeout: this.interval + }) + }, { + peerId + }) + .catch(err => { + this.log.error('failed to reconnect to %p', peerId, err) + }) + } + + start (): void { + this.started = true + } + + async afterStart (): Promise { + // re-connect to any peers with the KEEP_ALIVE tag + void Promise.resolve() + .then(async () => { + const keepAlivePeers: Peer[] = await this.peerStore.all({ + filters: [(peer) => { + return peer.tags.has(KEEP_ALIVE) + }] + }) + + await Promise.all( + keepAlivePeers.map(async peer => { + await this.connectionManager.openConnection(peer.id) + .catch(err => { + this.log.error(err) + }) + }) + ) + }) + .catch(err => { + this.log.error(err) + }) + } + + stop (): void { + this.started = false + this.queue.abort() + } +} diff --git a/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts new file mode 100644 index 0000000000..d952587466 --- /dev/null +++ b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts @@ -0,0 +1,117 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, TypedEventEmitter, start, stop } from '@libp2p/interface' +import { peerLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import delay from 'delay' +import pRetry from 'p-retry' +import sinon from 'sinon' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { ReconnectQueue } from '../../src/connection-manager/reconnect-queue.js' +import type { ComponentLogger, Libp2pEvents, PeerStore, TypedEventTarget, Peer } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +describe('reconnect queue', () => { + let components: { + connectionManager: StubbedInstance + events: TypedEventTarget + peerStore: StubbedInstance + logger: ComponentLogger + } + let queue: ReconnectQueue + + beforeEach(async () => { + const peerId = await createEd25519PeerId() + + components = { + connectionManager: stubInterface(), + events: new TypedEventEmitter(), + peerStore: stubInterface(), + logger: peerLogger(peerId) + } + }) + + afterEach(async () => { + await stop(queue) + + sinon.reset() + }) + + it('should reconnect to KEEP_ALIVE peers on startup', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([ + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ]) + + await start(queue) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should reconnect to KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(keepAlivePeer).resolves( + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: keepAlivePeer + })) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should not reconnect to non-KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const nonKeepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(nonKeepAlivePeer).resolves( + stubInterface({ + id: nonKeepAlivePeer, + tags: new Map() + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: nonKeepAlivePeer + })) + + await delay(1000) + + expect(components.connectionManager.openConnection.calledWith(nonKeepAlivePeer)).to.be.false() + }) +}) From 3824fb2a43ce6740cbe08bb4ba9957b901d2c576 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 15 Aug 2024 07:35:42 +0100 Subject: [PATCH 4/4] chore: pass interval --- .../connection-manager/constants.defaults.ts | 5 +++ .../libp2p/src/connection-manager/index.ts | 41 +++++++++++++------ .../src/connection-manager/reconnect-queue.ts | 26 ++++++++---- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 632fff7253..628088e8ed 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -23,6 +23,11 @@ export const INBOUND_CONNECTION_THRESHOLD = 5 */ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelReconnects + */ +export const MAX_PARALLEL_RECONNECTS = 5 + /** * Store as part of the peer store metadata for a given peer, the value for this * key is a timestamp of the last time a dial attempt failed with the timestamp diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index ac1eaa9b60..5b2e59e778 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -18,8 +18,10 @@ export const DEFAULT_DIAL_PRIORITY = 50 export interface ConnectionManagerInit { /** - * The maximum number of connections libp2p is willing to have before it starts - * pruning connections to reduce resource usage. (default: 300, 100 in browsers) + * The maximum number of connections libp2p is willing to have before it + * starts pruning connections to reduce resource usage. + * + * @default 300/100 */ maxConnections?: number @@ -31,7 +33,8 @@ export interface ConnectionManagerInit { /** * The maximum number of dials across all peers to execute in parallel. - * (default: 100, 50 in browsers) + * + * @default 100/50 */ maxParallelDials?: number @@ -59,7 +62,9 @@ export interface ConnectionManagerInit { /** * When a new inbound connection is opened, the upgrade process (e.g. protect, - * encrypt, multiplex etc) must complete within this number of ms. (default: 30s) + * encrypt, multiplex etc) must complete within this number of ms. + * + * @default 30000 */ inboundUpgradeTimeout?: number @@ -70,7 +75,8 @@ export interface ConnectionManagerInit { /** * A list of multiaddrs that will always be allowed (except if they are in the - * deny list) to open connections to this node even if we've reached maxConnections + * deny list) to open connections to this node even if we've reached + * maxConnections */ allow?: string[] @@ -87,9 +93,11 @@ export interface ConnectionManagerInit { inboundConnectionThreshold?: number /** - * The maximum number of parallel incoming connections allowed that have yet to - * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc. - * (default: 10) + * The maximum number of parallel incoming connections allowed that have yet + * to complete the connection upgrade - e.g. choosing connection encryption, + * muxer, etc. + * + * @default 10 */ maxIncomingPendingConnections?: number @@ -108,7 +116,7 @@ export interface ConnectionManagerInit { * * @default 1000 */ - reconnectInterval?: number + reconnectRetryInterval?: number /** * When a peer tagged with `KEEP_ALIVE` disconnects, apply this multiplication @@ -116,7 +124,15 @@ export interface ConnectionManagerInit { * * @default 2 */ - reconnectFactor?: number + reconnectBackoffFactor?: number + + /** + * When a peers tagged with `KEEP_ALIVE` disconnect, reconnect to this many at + * once. + * + * @default 5 + */ + maxParallelReconnects?: number } const defaultOptions = { @@ -223,8 +239,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { connectionManager: this }, { retries: init.reconnectRetries, - interval: init.reconnectInterval, - factor: init.reconnectFactor + retryInterval: init.reconnectRetryInterval, + backoffFactor: init.reconnectBackoffFactor, + maxParallelReconnects: init.maxParallelReconnects }) } diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts index 2b56d2d8cc..75929fe6db 100644 --- a/packages/libp2p/src/connection-manager/reconnect-queue.ts +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -1,7 +1,8 @@ import { KEEP_ALIVE } from '@libp2p/interface' import { PeerQueue } from '@libp2p/utils/peer-queue' import pRetry from 'p-retry' -import type { ComponentLogger, Libp2pEvents, Logger, Peer, PeerId, PeerStore, Startable, TypedEventTarget } from '@libp2p/interface' +import { MAX_PARALLEL_RECONNECTS } from './constants.js' +import type { ComponentLogger, Libp2pEvents, Logger, Metrics, Peer, PeerId, PeerStore, Startable, TypedEventTarget } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal' export interface ReconnectQueueComponents { @@ -9,12 +10,14 @@ export interface ReconnectQueueComponents { events: TypedEventTarget peerStore: PeerStore logger: ComponentLogger + metrics?: Metrics } export interface ReconnectQueueInit { retries?: number - interval?: number - factor?: number + retryInterval?: number + backoffFactor?: number + maxParallelReconnects?: number } /** @@ -27,18 +30,23 @@ export class ReconnectQueue implements Startable { private started: boolean private readonly peerStore: PeerStore private readonly retries: number - private readonly interval?: number - private readonly factor?: number + private readonly retryInterval?: number + private readonly backoffFactor?: number private readonly connectionManager: ConnectionManager constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { this.log = components.logger.forComponent('libp2p:reconnect-queue') this.peerStore = components.peerStore this.connectionManager = components.connectionManager - this.queue = new PeerQueue() + this.queue = new PeerQueue({ + concurrency: init.maxParallelReconnects ?? MAX_PARALLEL_RECONNECTS, + metricName: 'libp2p_reconnect_queue', + metrics: components.metrics + }) this.started = false this.retries = init.retries ?? 5 - this.factor = init.factor + this.backoffFactor = init.backoffFactor + this.retryInterval = init.retryInterval components.events.addEventListener('peer:disconnect', (evt) => { this.maybeReconnect(evt.detail) @@ -80,8 +88,8 @@ export class ReconnectQueue implements Startable { }, { signal: options?.signal, retries: this.retries, - factor: this.factor, - minTimeout: this.interval + factor: this.backoffFactor, + minTimeout: this.retryInterval }) }, { peerId