diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 2ef97839c1..b5debc6e4d 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -626,8 +626,7 @@ const node = await createLibp2p({ noise() ], connectionManager: { - maxConnections: Infinity, - minConnections: 0 + maxConnections: Infinity } }) ``` diff --git a/doc/LIMITS.md b/doc/LIMITS.md index 5d5efd523e..2187ffabc9 100644 --- a/doc/LIMITS.md +++ b/doc/LIMITS.md @@ -38,12 +38,6 @@ const node = await createLibp2p({ */ maxConnections: 100, - /** - * If the number of open connections goes below this number, the node - * will try to connect to randomly selected peers from the peer store - */ - minConnections: 50, - /** * How many connections can be open but not yet upgraded */ diff --git a/interop/test/fixtures/get-libp2p.ts b/interop/test/fixtures/get-libp2p.ts index f5196366f2..069fc90326 100644 --- a/interop/test/fixtures/get-libp2p.ts +++ b/interop/test/fixtures/get-libp2p.ts @@ -25,9 +25,6 @@ const IP = process.env.ip ?? '0.0.0.0' export async function getLibp2p (): Promise> { const options: Libp2pOptions<{ ping: PingService, identify: Identify }> = { start: true, - connectionManager: { - minConnections: 0 - }, connectionGater: { denyDialMultiaddr: async () => false }, diff --git a/packages/integration-tests/.aegir.js b/packages/integration-tests/.aegir.js index 078dd47d7d..22a2d0a630 100644 --- a/packages/integration-tests/.aegir.js +++ b/packages/integration-tests/.aegir.js @@ -24,8 +24,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/integration-tests/test/bootstrap.spec.ts b/packages/integration-tests/test/bootstrap.spec.ts index 3334ccf8ac..e9f80a169a 100644 --- a/packages/integration-tests/test/bootstrap.spec.ts +++ b/packages/integration-tests/test/bootstrap.spec.ts @@ -2,8 +2,11 @@ import { bootstrap } from '@libp2p/bootstrap' import { TypedEventEmitter, peerDiscoverySymbol } from '@libp2p/interface' +import { mplex } from '@libp2p/mplex' import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { plaintext } from '@libp2p/plaintext' import { webSockets } from '@libp2p/websockets' +import * as Filter from '@libp2p/websockets/filters' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { createLibp2p } from 'libp2p' @@ -103,4 +106,52 @@ describe('bootstrap', () => { return deferred.promise }) + + it('bootstrap should dial all peers in the list', async () => { + const deferred = defer() + + const bootstrappers = [ + `${process.env.RELAY_MULTIADDR}` + ] + + libp2p = await createLibp2p({ + connectionEncryption: [ + plaintext() + ], + transports: [ + webSockets({ + filter: Filter.all + }) + ], + streamMuxers: [ + mplex() + ], + peerDiscovery: [ + bootstrap({ + list: bootstrappers + }) + ], + connectionGater: { + denyDialMultiaddr: () => false + } + }) + + const expectedPeers = new Set( + bootstrappers.map(ma => multiaddr(ma).getPeerId()) + ) + + libp2p.addEventListener('connection:open', (evt) => { + const { remotePeer } = evt.detail + + expectedPeers.delete(remotePeer.toString()) + if (expectedPeers.size === 0) { + libp2p.removeEventListener('connection:open') + deferred.resolve() + } + }) + + await libp2p.start() + + return deferred.promise + }) }) diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index 162fec3e41..e0647b9ce4 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -42,9 +42,6 @@ async function createClient (options: Libp2pOptions = {}): Promise { connectionEncryption: [ plaintext() ], - connectionManager: { - minConnections: 0 - }, services: { identify: identify() }, diff --git a/packages/integration-tests/test/fetch.spec.ts b/packages/integration-tests/test/fetch.spec.ts index c8ca47ade2..ad9c6569ac 100644 --- a/packages/integration-tests/test/fetch.spec.ts +++ b/packages/integration-tests/test/fetch.spec.ts @@ -11,9 +11,6 @@ async function createNode (): Promise> { return createLibp2p(createBaseOptions({ services: { fetch: fetch() - }, - connectionManager: { - minConnections: 0 } })) } diff --git a/packages/integration-tests/test/interop.ts b/packages/integration-tests/test/interop.ts index 01f0a4a4c9..8f2323674e 100644 --- a/packages/integration-tests/test/interop.ts +++ b/packages/integration-tests/test/interop.ts @@ -134,10 +134,7 @@ async function createJsPeer (options: SpawnOptions): Promise { }, transports: [tcp(), circuitRelayTransport()], streamMuxers: [], - connectionEncryption: [noise()], - connectionManager: { - minConnections: 0 - } + connectionEncryption: [noise()] } if (options.noListen !== true) { diff --git a/packages/integration-tests/test/peers.spec.ts b/packages/integration-tests/test/peers.spec.ts new file mode 100644 index 0000000000..2992de118d --- /dev/null +++ b/packages/integration-tests/test/peers.spec.ts @@ -0,0 +1,59 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, type Libp2p } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import { createLibp2p } from 'libp2p' +import pWaitFor from 'p-wait-for' +import { createBaseOptions } from './fixtures/base-options.js' + +describe('peers', () => { + let nodes: Libp2p[] + + beforeEach(async () => { + nodes = await Promise.all([ + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()), + createLibp2p(createBaseOptions()) + ]) + }) + + afterEach(async () => Promise.all(nodes.map(async n => { await n.stop() }))) + + it('should redial a peer tagged with KEEP_ALIVE', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + await nodes[0].peerStore.merge(nodes[1].peerId, { + tags: { + [KEEP_ALIVE]: { + value: 1 + } + } + }) + + await Promise.all( + nodes[0].getConnections(nodes[1].peerId).map(async conn => conn.close()) + ) + + await pWaitFor(async () => { + return nodes[0].getConnections(nodes[1].peerId).length > 0 + }, { + interval: 100, + timeout: { + milliseconds: 5000, + message: 'Did not reconnect to peer tagged with KEEP_ALIVE' + } + }) + }) + + it('should store the multiaddr for a peer after a successful dial', async () => { + await nodes[0].dial(nodes[1].getMultiaddrs()) + + expect(nodes[0].getConnections(nodes[1].peerId)).to.not.be.empty() + + const peer = await nodes[0].peerStore.get(nodes[1].peerId) + expect(peer.addresses).to.not.be.empty() + expect(peer.metadata.get('last-dial-success')).to.be.ok() + }) +}) diff --git a/packages/libp2p/.aegir.js b/packages/libp2p/.aegir.js index a89fba7ad6..a7250fb938 100644 --- a/packages/libp2p/.aegir.js +++ b/packages/libp2p/.aegir.js @@ -20,8 +20,7 @@ export default { const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ connectionManager: { - inboundConnectionThreshold: Infinity, - minConnections: 0 + inboundConnectionThreshold: Infinity }, addresses: { listen: [ diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index a288037a34..708c906a0a 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -107,6 +107,7 @@ "merge-options": "^3.0.4", "multiformats": "^13.1.0", "p-defer": "^4.0.1", + "p-retry": "^6.2.0", "progress-events": "^1.0.0", "race-event": "^1.3.0", "race-signal": "^1.0.2", diff --git a/packages/libp2p/src/connection-manager/auto-dial.ts b/packages/libp2p/src/connection-manager/auto-dial.ts deleted file mode 100644 index 16fada9221..0000000000 --- a/packages/libp2p/src/connection-manager/auto-dial.ts +++ /dev/null @@ -1,285 +0,0 @@ -import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { PeerQueue } from '@libp2p/utils/peer-queue' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js' -import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface' -import type { ConnectionManager } from '@libp2p/interface-internal' - -interface AutoDialInit { - minConnections?: number - maxQueueLength?: number - autoDialConcurrency?: number - autoDialPriority?: number - autoDialInterval?: number - autoDialPeerRetryThreshold?: number - autoDialDiscoveredPeersDebounce?: number -} - -interface AutoDialComponents { - connectionManager: ConnectionManager - peerStore: PeerStore - events: TypedEventTarget - logger: ComponentLogger - metrics?: Metrics -} - -const defaultOptions = { - minConnections: MIN_CONNECTIONS, - maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialInterval: AUTO_DIAL_INTERVAL, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE -} - -export class AutoDial implements Startable { - private readonly connectionManager: ConnectionManager - private readonly peerStore: PeerStore - private readonly queue: PeerQueue - private readonly minConnections: number - private readonly autoDialPriority: number - private readonly autoDialIntervalMs: number - private readonly autoDialMaxQueueLength: number - private readonly autoDialPeerRetryThresholdMs: number - private readonly autoDialDiscoveredPeersDebounce: number - private autoDialInterval?: ReturnType - private started: boolean - private running: boolean - private readonly log: Logger - - /** - * Proactively tries to connect to known peers stored in the PeerStore. - * It will keep the number of connections below the upper limit and sort - * the peers to connect based on whether we know their keys and protocols. - */ - constructor (components: AutoDialComponents, init: AutoDialInit) { - this.connectionManager = components.connectionManager - this.peerStore = components.peerStore - this.minConnections = init.minConnections ?? defaultOptions.minConnections - this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority - this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval - this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength - this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold - this.autoDialDiscoveredPeersDebounce = init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce - this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial') - this.started = false - this.running = false - this.queue = new PeerQueue({ - concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - metricName: 'libp2p_autodial_queue', - metrics: components.metrics - }) - this.queue.addEventListener('error', (evt) => { - this.log.error('error during auto-dial', evt.detail) - }) - - // check the min connection limit whenever a peer disconnects - components.events.addEventListener('connection:close', () => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }) - - // sometimes peers are discovered in quick succession so add a small - // debounce to ensure all eligible peers are autodialed - let debounce: ReturnType - - // when new peers are discovered, dial them if we don't have - // enough connections - components.events.addEventListener('peer:discovery', () => { - clearTimeout(debounce) - debounce = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error(err) - }) - }, this.autoDialDiscoveredPeersDebounce) - }) - } - - isStarted (): boolean { - return this.started - } - - start (): void { - this.started = true - } - - afterStart (): void { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - } - - stop (): void { - // clear the queue - this.queue.clear() - clearTimeout(this.autoDialInterval) - this.started = false - this.running = false - } - - async autoDial (): Promise { - if (!this.started || this.running) { - return - } - - const connections = this.connectionManager.getConnectionsMap() - const numConnections = connections.size - - // already have enough connections - if (numConnections >= this.minConnections) { - if (this.minConnections > 0) { - this.log.trace('have enough connections %d/%d', numConnections, this.minConnections) - } - - // no need to schedule next autodial as it will be run when on - // connection:close event - return - } - - if (this.queue.size > this.autoDialMaxQueueLength) { - this.log('not enough connections %d/%d but auto dial queue is full', numConnections, this.minConnections) - this.sheduleNextAutodial() - return - } - - this.running = true - - this.log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections) - - const dialQueue = new PeerSet( - // @ts-expect-error boolean filter removes falsy peer IDs - this.connectionManager.getDialQueue() - .map(queue => queue.peerId) - .filter(Boolean) - ) - - // sort peers on whether we know protocols or public keys for them - const peers = await this.peerStore.all({ - filters: [ - // remove some peers - (peer) => { - // remove peers without addresses - if (peer.addresses.length === 0) { - this.log.trace('not autodialing %p because they have no addresses', peer.id) - return false - } - - // remove peers we are already connected to - if (connections.has(peer.id)) { - this.log.trace('not autodialing %p because they are already connected', peer.id) - return false - } - - // remove peers we are already dialling - if (dialQueue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being dialed', peer.id) - return false - } - - // remove peers already in the autodial queue - if (this.queue.has(peer.id)) { - this.log.trace('not autodialing %p because they are already being autodialed', peer.id) - return false - } - - return true - } - ] - }) - - // shuffle the peers - this is so peers with the same tag values will be - // dialled in a different order each time - const shuffledPeers = peers.sort(() => Math.random() > 0.5 ? 1 : -1) - - // sort shuffled peers by tag value - const peerValues = new PeerMap() - for (const peer of shuffledPeers) { - if (peerValues.has(peer.id)) { - continue - } - - // sum all tag values - peerValues.set(peer.id, [...peer.tags.values()].reduce((acc, curr) => { - return acc + curr.value - }, 0)) - } - - // sort by value, highest to lowest - const sortedPeers = shuffledPeers.sort((a, b) => { - const peerAValue = peerValues.get(a.id) ?? 0 - const peerBValue = peerValues.get(b.id) ?? 0 - - if (peerAValue > peerBValue) { - return -1 - } - - if (peerAValue < peerBValue) { - return 1 - } - - return 0 - }) - - const peersThatHaveNotFailed = sortedPeers.filter(peer => { - const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY) - - if (lastDialFailure == null) { - return true - } - - const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure)) - - if (isNaN(lastDialFailureTimestamp)) { - return true - } - - // only dial if the time since the last failure is above the retry threshold - return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs - }) - - this.log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length) - - for (const peer of peersThatHaveNotFailed) { - this.queue.add(async () => { - const numConnections = this.connectionManager.getConnectionsMap().size - - // Check to see if we still need to auto dial - if (numConnections >= this.minConnections) { - this.log('got enough connections now %d/%d', numConnections, this.minConnections) - this.queue.clear() - return - } - - this.log('connecting to a peerStore stored peer %p', peer.id) - await this.connectionManager.openConnection(peer.id, { - priority: this.autoDialPriority - }) - }, { - peerId: peer.id - }).catch(err => { - this.log.error('could not connect to peerStore stored peer', err) - }) - } - - this.running = false - this.sheduleNextAutodial() - } - - private sheduleNextAutodial (): void { - if (!this.started) { - return - } - - this.autoDialInterval = setTimeout(() => { - this.autoDial() - .catch(err => { - this.log.error('error while autodialing', err) - }) - }, this.autoDialIntervalMs) - } -} diff --git a/packages/libp2p/src/connection-manager/constants.browser.ts b/packages/libp2p/src/connection-manager/constants.browser.ts index 2c369c1245..cdafdbcdb2 100644 --- a/packages/libp2p/src/connection-manager/constants.browser.ts +++ b/packages/libp2p/src/connection-manager/constants.browser.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 5 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 100 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 50 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 * 7 diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index d9a6f6e3b6..628088e8ed 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -13,31 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 2e3 */ export const MAX_PEER_ADDRS_TO_DIAL = 25 -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval - */ -export const AUTO_DIAL_INTERVAL = 5000 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialConcurrency - */ -export const AUTO_DIAL_CONCURRENCY = 25 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialPriority - */ -export const AUTO_DIAL_PRIORITY = 0 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialMaxQueueLength - */ -export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialDiscoveredPeersDebounce - */ -export const AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE = 10 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold */ @@ -48,16 +23,28 @@ export const INBOUND_CONNECTION_THRESHOLD = 5 */ export const MAX_INCOMING_PENDING_CONNECTIONS = 10 +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelReconnects + */ +export const MAX_PARALLEL_RECONNECTS = 5 + /** * Store as part of the peer store metadata for a given peer, the value for this - * key is a timestamp of the last time a dial attempted failed with the relevant - * peer stored as a string. + * key is a timestamp of the last time a dial attempt failed with the timestamp + * stored as a string. * * Used to insure we do not endlessly try to auto dial peers we have recently * failed to dial. */ export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure' +/** + * Store as part of the peer store metadata for a given peer, the value for this + * key is a timestamp of the last time a dial attempt succeeded with the + * timestamp stored as a string. + */ +export const LAST_DIAL_SUCCESS_KEY = 'last-dial-success' + /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxDialQueueLength */ diff --git a/packages/libp2p/src/connection-manager/constants.ts b/packages/libp2p/src/connection-manager/constants.ts index a6a6c486f4..422074f57c 100644 --- a/packages/libp2p/src/connection-manager/constants.ts +++ b/packages/libp2p/src/connection-manager/constants.ts @@ -1,10 +1,5 @@ export * from './constants.defaults.js' -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#minConnections - */ -export const MIN_CONNECTIONS = 50 - /** * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxConnections */ @@ -14,8 +9,3 @@ export const MAX_CONNECTIONS = 300 * @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials */ export const MAX_PARALLEL_DIALS = 100 - -/** - * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold - */ -export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 9d927e4c51..d88b04d254 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -16,7 +16,8 @@ import { MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, LAST_DIAL_FAILURE_KEY, - MAX_DIAL_QUEUE_LENGTH + MAX_DIAL_QUEUE_LENGTH, + LAST_DIAL_SUCCESS_KEY } from './constants.js' import { resolveMultiaddrs } from './utils.js' import { DEFAULT_DIAL_PRIORITY } from './index.js' @@ -244,6 +245,20 @@ export class DialQueue { this.log('dial to %a succeeded', address.multiaddr) + // record the successful dial and the address + try { + await this.components.peerStore.merge(conn.remotePeer, { + multiaddrs: [ + conn.remoteAddr + ], + metadata: { + [LAST_DIAL_SUCCESS_KEY]: uint8ArrayFromString(Date.now().toString()) + } + }) + } catch (err: any) { + this.log.error('could not update last dial failure key for %p', peerId, err) + } + return conn } catch (err: any) { this.log.error('dial failed to %a', address.multiaddr, err) @@ -251,7 +266,7 @@ export class DialQueue { if (peerId != null) { // record the failed dial try { - await this.components.peerStore.patch(peerId, { + await this.components.peerStore.merge(peerId, { metadata: { [LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString()) } diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 72972f060e..5b2e59e778 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -1,4 +1,4 @@ -import { InvalidParametersError, KEEP_ALIVE, NotStartedError } from '@libp2p/interface' +import { InvalidParametersError, NotStartedError, start, stop } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' import { RateLimiter } from '@libp2p/utils/rate-limiter' @@ -6,11 +6,11 @@ import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiadd import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' import { CustomProgressEvent } from 'progress-events' import { getPeerAddress } from '../get-peer.js' -import { AutoDial } from './auto-dial.js' import { ConnectionPruner } from './connection-pruner.js' -import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js' +import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' import { DialQueue } from './dial-queue.js' -import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, Peer, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' +import { ReconnectQueue } from './reconnect-queue.js' +import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, MultiaddrConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, Startable, PendingDialStatus, PeerRouting, IsDialableOptions } from '@libp2p/interface' import type { ConnectionManager, OpenConnectionOptions, TransportManager } from '@libp2p/interface-internal' import type { JobStatus } from '@libp2p/utils/queue' @@ -18,57 +18,13 @@ export const DEFAULT_DIAL_PRIORITY = 50 export interface ConnectionManagerInit { /** - * The maximum number of connections libp2p is willing to have before it starts - * pruning connections to reduce resource usage. (default: 300, 100 in browsers) + * The maximum number of connections libp2p is willing to have before it + * starts pruning connections to reduce resource usage. + * + * @default 300/100 */ maxConnections?: number - /** - * The minimum number of connections below which libp2p will start to dial peers - * from the peer book. Setting this to 0 effectively disables this behaviour. - * (default: 50, 5 in browsers) - */ - minConnections?: number - - /** - * How long to wait between attempting to keep our number of concurrent connections - * above minConnections (default: 5000) - */ - autoDialInterval?: number - - /** - * When dialling peers from the peer book to keep the number of open connections - * above `minConnections`, add dials for this many peers to the dial queue - * at once. (default: 25) - */ - autoDialConcurrency?: number - - /** - * To allow user dials to take priority over auto dials, use this value as the - * dial priority. (default: 0) - */ - autoDialPriority?: number - - /** - * Limit the maximum number of peers to dial when trying to keep the number of - * open connections above `minConnections`. (default: 100) - */ - autoDialMaxQueueLength?: number - - /** - * When we've failed to dial a peer, do not autodial them again within this - * number of ms. (default: 1 minute, 7 minutes in browsers) - */ - autoDialPeerRetryThreshold?: number - - /** - * Newly discovered peers may be auto-dialed to increase the number of open - * connections, but they can be discovered in quick succession so add a small - * delay before attempting to dial them in case more peers have been - * discovered. (default: 10ms) - */ - autoDialDiscoveredPeersDebounce?: number - /** * Sort the known addresses of a peer before trying to dial, By default public * addresses will be dialled before private (e.g. loopback or LAN) addresses. @@ -77,7 +33,8 @@ export interface ConnectionManagerInit { /** * The maximum number of dials across all peers to execute in parallel. - * (default: 100, 50 in browsers) + * + * @default 100/50 */ maxParallelDials?: number @@ -105,7 +62,9 @@ export interface ConnectionManagerInit { /** * When a new inbound connection is opened, the upgrade process (e.g. protect, - * encrypt, multiplex etc) must complete within this number of ms. (default: 30s) + * encrypt, multiplex etc) must complete within this number of ms. + * + * @default 30000 */ inboundUpgradeTimeout?: number @@ -116,7 +75,8 @@ export interface ConnectionManagerInit { /** * A list of multiaddrs that will always be allowed (except if they are in the - * deny list) to open connections to this node even if we've reached maxConnections + * deny list) to open connections to this node even if we've reached + * maxConnections */ allow?: string[] @@ -133,23 +93,52 @@ export interface ConnectionManagerInit { inboundConnectionThreshold?: number /** - * The maximum number of parallel incoming connections allowed that have yet to - * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc. - * (default: 10) + * The maximum number of parallel incoming connections allowed that have yet + * to complete the connection upgrade - e.g. choosing connection encryption, + * muxer, etc. + * + * @default 10 */ maxIncomingPendingConnections?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, attempt to redial them + * this many times. + * + * @default 5 + */ + reconnectRetries?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, wait this long between + * each retry. Note this will be multiplied by `reconnectFactor` to create an + * increasing retry backoff. + * + * @default 1000 + */ + reconnectRetryInterval?: number + + /** + * When a peer tagged with `KEEP_ALIVE` disconnects, apply this multiplication + * factor to the time interval between each retry. + * + * @default 2 + */ + reconnectBackoffFactor?: number + + /** + * When a peers tagged with `KEEP_ALIVE` disconnect, reconnect to this many at + * once. + * + * @default 5 + */ + maxParallelReconnects?: number } const defaultOptions = { - minConnections: MIN_CONNECTIONS, maxConnections: MAX_CONNECTIONS, inboundConnectionThreshold: INBOUND_CONNECTION_THRESHOLD, - maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS, - autoDialConcurrency: AUTO_DIAL_CONCURRENCY, - autoDialPriority: AUTO_DIAL_PRIORITY, - autoDialMaxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH, - autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD, - autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE + maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS } export interface DefaultConnectionManagerComponents { @@ -176,7 +165,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { private readonly maxConnections: number public readonly dialQueue: DialQueue - public readonly autoDial: AutoDial + public readonly reconnectQueue: ReconnectQueue public readonly connectionPruner: ConnectionPruner private readonly inboundConnectionRateLimiter: RateLimiter private readonly peerStore: PeerStore @@ -186,10 +175,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit = {}) { this.maxConnections = init.maxConnections ?? defaultOptions.maxConnections - const minConnections = init.minConnections ?? defaultOptions.minConnections - if (this.maxConnections < minConnections) { - throw new InvalidParametersError('Connection Manager maxConnections must be greater than minConnections') + if (this.maxConnections < 1) { + throw new InvalidParametersError('Connection Manager maxConnections must be greater than 0') } /** @@ -221,21 +209,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { duration: 1 }) - // controls what happens when we don't have enough connections - this.autoDial = new AutoDial({ - connectionManager: this, - peerStore: components.peerStore, - events: components.events, - logger: components.logger - }, { - minConnections, - autoDialConcurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency, - autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority, - autoDialPeerRetryThreshold: init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold, - autoDialDiscoveredPeersDebounce: init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce, - maxQueueLength: init.autoDialMaxQueueLength ?? defaultOptions.autoDialMaxQueueLength - }) - // controls what happens when we have too many connections this.connectionPruner = new ConnectionPruner({ connectionManager: this, @@ -258,6 +231,18 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { }, connections: this.connections }) + + this.reconnectQueue = new ReconnectQueue({ + events: components.events, + peerStore: components.peerStore, + logger: components.logger, + connectionManager: this + }, { + retries: init.reconnectRetries, + retryInterval: init.reconnectRetryInterval, + backoffFactor: init.reconnectBackoffFactor, + maxParallelReconnects: init.maxParallelReconnects + }) } readonly [Symbol.toStringTag] = '@libp2p/connection-manager' @@ -349,45 +334,23 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { } }) - this.dialQueue.start() - this.autoDial.start() + await start( + this.dialQueue, + this.reconnectQueue + ) this.started = true this.log('started') } - async afterStart (): Promise { - // re-connect to any peers with the KEEP_ALIVE tag - void Promise.resolve() - .then(async () => { - const keepAlivePeers: Peer[] = await this.peerStore.all({ - filters: [(peer) => { - return peer.tags.has(KEEP_ALIVE) - }] - }) - - await Promise.all( - keepAlivePeers.map(async peer => { - await this.openConnection(peer.id) - .catch(err => { - this.log.error(err) - }) - }) - ) - }) - .catch(err => { - this.log.error(err) - }) - - this.autoDial.afterStart() - } - /** * Stops the Connection Manager */ async stop (): Promise { - this.dialQueue.stop() - this.autoDial.stop() + await stop( + this.reconnectQueue, + this.dialQueue + ) // Close all connections we're tracking const tasks: Array> = [] diff --git a/packages/libp2p/src/connection-manager/reconnect-queue.ts b/packages/libp2p/src/connection-manager/reconnect-queue.ts new file mode 100644 index 0000000000..75929fe6db --- /dev/null +++ b/packages/libp2p/src/connection-manager/reconnect-queue.ts @@ -0,0 +1,134 @@ +import { KEEP_ALIVE } from '@libp2p/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import pRetry from 'p-retry' +import { MAX_PARALLEL_RECONNECTS } from './constants.js' +import type { ComponentLogger, Libp2pEvents, Logger, Metrics, Peer, PeerId, PeerStore, Startable, TypedEventTarget } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface ReconnectQueueComponents { + connectionManager: ConnectionManager + events: TypedEventTarget + peerStore: PeerStore + logger: ComponentLogger + metrics?: Metrics +} + +export interface ReconnectQueueInit { + retries?: number + retryInterval?: number + backoffFactor?: number + maxParallelReconnects?: number +} + +/** + * When peers tagged with `KEEP_ALIVE` disconnect, this component attempts to + * redial them + */ +export class ReconnectQueue implements Startable { + private readonly log: Logger + private readonly queue: PeerQueue + private started: boolean + private readonly peerStore: PeerStore + private readonly retries: number + private readonly retryInterval?: number + private readonly backoffFactor?: number + private readonly connectionManager: ConnectionManager + + constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) { + this.log = components.logger.forComponent('libp2p:reconnect-queue') + this.peerStore = components.peerStore + this.connectionManager = components.connectionManager + this.queue = new PeerQueue({ + concurrency: init.maxParallelReconnects ?? MAX_PARALLEL_RECONNECTS, + metricName: 'libp2p_reconnect_queue', + metrics: components.metrics + }) + this.started = false + this.retries = init.retries ?? 5 + this.backoffFactor = init.backoffFactor + this.retryInterval = init.retryInterval + + components.events.addEventListener('peer:disconnect', (evt) => { + this.maybeReconnect(evt.detail) + .catch(err => { + this.log.error('failed to maybe reconnect to %p', evt.detail, err) + }) + }) + } + + private async maybeReconnect (peerId: PeerId): Promise { + if (!this.started) { + return + } + + const peer = await this.peerStore.get(peerId) + + if (!peer.tags.has(KEEP_ALIVE)) { + return + } + + if (this.queue.has(peerId)) { + return + } + + this.queue.add(async (options) => { + await pRetry(async (attempt) => { + if (!this.started) { + return + } + + try { + await this.connectionManager.openConnection(peerId, { + signal: options?.signal + }) + } catch (err) { + this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err) + throw err + } + }, { + signal: options?.signal, + retries: this.retries, + factor: this.backoffFactor, + minTimeout: this.retryInterval + }) + }, { + peerId + }) + .catch(err => { + this.log.error('failed to reconnect to %p', peerId, err) + }) + } + + start (): void { + this.started = true + } + + async afterStart (): Promise { + // re-connect to any peers with the KEEP_ALIVE tag + void Promise.resolve() + .then(async () => { + const keepAlivePeers: Peer[] = await this.peerStore.all({ + filters: [(peer) => { + return peer.tags.has(KEEP_ALIVE) + }] + }) + + await Promise.all( + keepAlivePeers.map(async peer => { + await this.connectionManager.openConnection(peer.id) + .catch(err => { + this.log.error(err) + }) + }) + ) + }) + .catch(err => { + this.log.error(err) + }) + } + + stop (): void { + this.started = false + this.queue.abort() + } +} diff --git a/packages/libp2p/test/connection-manager/auto-dial.spec.ts b/packages/libp2p/test/connection-manager/auto-dial.spec.ts deleted file mode 100644 index 2d7bdcec03..0000000000 --- a/packages/libp2p/test/connection-manager/auto-dial.spec.ts +++ /dev/null @@ -1,293 +0,0 @@ -/* eslint-env mocha */ - -import { TypedEventEmitter, type TypedEventTarget, type Libp2pEvents, type Connection, type PeerId, type PeerStore, type Peer } from '@libp2p/interface' -import { matchPeerId } from '@libp2p/interface-compliance-tests/matchers' -import { defaultLogger } from '@libp2p/logger' -import { PeerMap } from '@libp2p/peer-collections' -import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { PersistentPeerStore } from '@libp2p/peer-store' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { MemoryDatastore } from 'datastore-core' -import delay from 'delay' -import pWaitFor from 'p-wait-for' -import Sinon from 'sinon' -import { stubInterface } from 'sinon-ts' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { defaultComponents } from '../../src/components.js' -import { AutoDial } from '../../src/connection-manager/auto-dial.js' -import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js' -import type { ConnectionManager } from '@libp2p/interface-internal' - -describe('auto-dial', () => { - let autoDialer: AutoDial - let events: TypedEventTarget - let peerStore: PeerStore - let peerId: PeerId - - beforeEach(async () => { - peerId = await createEd25519PeerId() - events = new TypedEventEmitter() - peerStore = new PersistentPeerStore({ - datastore: new MemoryDatastore(), - events, - peerId, - logger: defaultLogger() - }) - }) - - afterEach(() => { - if (autoDialer != null) { - autoDialer.stop() - } - }) - - it('should not dial peers without multiaddrs', async () => { - // peers with protocols are dialled before peers without protocols - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [ - '/foo/bar' - ], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerWithoutAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(peerWithoutAddress.id, peerWithoutAddress) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithoutAddress.id))).to.be.false() - }) - - it('should not dial connected peers', async () => { - const connectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const unConnectedPeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(connectedPeer.id, connectedPeer) - await peerStore.save(unConnectedPeer.id, unConnectedPeer) - - const connectionMap = new PeerMap() - connectionMap.set(connectedPeer.id, [stubInterface()]) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(connectionMap), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(unConnectedPeer.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(connectedPeer.id))).to.be.false() - }) - - it('should not dial peers already in the dial queue', async () => { - // peers with protocols are dialled before peers without protocols - const peerInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const peerNotInDialQueue: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - - await peerStore.save(peerInDialQueue.id, peerInDialQueue) - await peerStore.save(peerNotInDialQueue.id, peerNotInDialQueue) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([{ - id: 'foo', - peerId: peerInDialQueue.id, - multiaddrs: [], - status: 'queued' - }]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10 - }) - autoDialer.start() - await autoDialer.autoDial() - - await pWaitFor(() => connectionManager.openConnection.callCount === 1) - await delay(1000) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerNotInDialQueue.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(peerInDialQueue.id))).to.be.false() - }) - - it('should not start parallel autodials', async () => { - const peerStoreAllSpy = Sinon.spy(peerStore, 'all') - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialInterval: 10000 - }) - autoDialer.start() - - // call autodial twice - await Promise.all([ - autoDialer.autoDial(), - autoDialer.autoDial() - ]) - - // should only have queried peer store once - expect(peerStoreAllSpy.callCount).to.equal(1) - }) - - it('should not re-dial peers we have recently failed to dial', async () => { - const peerWithAddress: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'), - isCertified: true - }], - metadata: new Map(), - tags: new Map() - } - const undialablePeer: Peer = { - id: await createEd25519PeerId(), - protocols: [], - addresses: [{ - multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'), - isCertified: true - }], - // we failed to dial them recently - metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]), - tags: new Map() - } - - await peerStore.save(peerWithAddress.id, peerWithAddress) - await peerStore.save(undialablePeer.id, undialablePeer) - - const connectionManager = stubInterface({ - getConnectionsMap: Sinon.stub().returns(new PeerMap()), - getDialQueue: Sinon.stub().returns([]) - }) - - autoDialer = new AutoDial(defaultComponents({ - peerStore, - connectionManager, - events - }), { - minConnections: 10, - autoDialPeerRetryThreshold: 2000 - }) - autoDialer.start() - - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 1 - }) - - expect(connectionManager.openConnection.callCount).to.equal(1) - expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true() - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false() - - // pass the retry threshold - await delay(2000) - - // autodial again - void autoDialer.autoDial() - - await pWaitFor(() => { - return connectionManager.openConnection.callCount === 3 - }) - - // should have retried the unreachable peer - expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true() - }) -}) diff --git a/packages/libp2p/test/connection-manager/direct.node.ts b/packages/libp2p/test/connection-manager/direct.node.ts index 653d6ba174..a71fe105d2 100644 --- a/packages/libp2p/test/connection-manager/direct.node.ts +++ b/packages/libp2p/test/connection-manager/direct.node.ts @@ -90,7 +90,6 @@ describe('dialing (direct, TCP)', () => { localComponents.peerStore = new PersistentPeerStore(localComponents) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) localComponents.addressManager = new DefaultAddressManager(localComponents) diff --git a/packages/libp2p/test/connection-manager/direct.spec.ts b/packages/libp2p/test/connection-manager/direct.spec.ts index f1a86da40a..a02ece928d 100644 --- a/packages/libp2p/test/connection-manager/direct.spec.ts +++ b/packages/libp2p/test/connection-manager/direct.spec.ts @@ -55,7 +55,6 @@ describe('dialing (direct, WebSockets)', () => { }) localComponents.connectionManager = new DefaultConnectionManager(localComponents, { maxConnections: 100, - minConnections: 50, inboundUpgradeTimeout: 1000 }) diff --git a/packages/libp2p/test/connection-manager/index.node.ts b/packages/libp2p/test/connection-manager/index.node.ts index 473279bfab..e4cdc1746b 100644 --- a/packages/libp2p/test/connection-manager/index.node.ts +++ b/packages/libp2p/test/connection-manager/index.node.ts @@ -6,10 +6,8 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { dns } from '@multiformats/dns' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import delay from 'delay' import all from 'it-all' import { pipe } from 'it-pipe' -import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { defaultComponents } from '../../src/components.js' @@ -59,7 +57,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -97,7 +94,6 @@ describe('Connection Manager', () => { }) const connectionManager = new DefaultConnectionManager(components, { maxConnections: 1000, - minConnections: 50, inboundUpgradeTimeout: 1000 }) @@ -198,141 +194,6 @@ describe('libp2p.connections', () => { sinon.reset() }) - it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => { - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections: 3 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peers to connect - await pWaitFor(() => libp2p.getConnections().length === 2) - - await libp2p.stop() - }) - - it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs() - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Wait more time to guarantee no other connection happened - await delay(200) - expect(libp2p.components.connectionManager.getConnections().length).to.eql(minConnections) - - await libp2p.stop() - }) - - // flaky - it.skip('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => { - const minConnections = 1 - libp2p = await createNode({ - started: false, - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections, - maxConnections: 1 - } - } - }) - - // Populate PeerStore before starting - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - await libp2p.peerStore.patch(nodes[1].peerId, { - multiaddrs: nodes[1].getMultiaddrs(), - protocols: ['/protocol-min-conns'] - }) - - await libp2p.start() - - // Wait for peer to connect - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === minConnections) - - // Should have connected to the peer with protocols - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.be.empty() - expect(libp2p.components.connectionManager.getConnections(nodes[1].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - - it('should connect to peers in the PeerStore when a peer disconnected', async () => { - const minConnections = 1 - - libp2p = await createNode({ - config: { - addresses: { - listen: ['/ip4/127.0.0.1/tcp/0/ws'] - }, - connectionManager: { - minConnections - } - } - }) - - // Populate PeerStore after starting (discovery) - await libp2p.peerStore.patch(nodes[0].peerId, { - multiaddrs: nodes[0].getMultiaddrs() - }) - - // Wait for peer to connect - const conn = await libp2p.dial(nodes[0].peerId) - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await conn.close() - // Closed - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 0) - // Connected - await pWaitFor(() => libp2p.components.connectionManager.getConnections().length === 1) - - expect(libp2p.components.connectionManager.getConnections(nodes[0].peerId)).to.not.be.empty() - - await libp2p.stop() - }) - it('should be closed status once immediately stopping', async () => { libp2p = await createNode({ config: createBaseOptions({ diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index eb0380fae8..9b52ee8a74 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -19,8 +19,6 @@ import type { TransportManager } from '@libp2p/interface-internal' const defaultOptions = { maxConnections: 10, - minConnections: 1, - autoDialInterval: Infinity, inboundUpgradeTimeout: 10000 } @@ -85,8 +83,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -144,8 +141,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 2 + maxConnections: max } }), started: false @@ -210,7 +206,6 @@ describe('Connection Manager', () => { config: createBaseOptions({ connectionManager: { maxConnections: max, - minConnections: 0, allow: [ '/ip4/83.13.55.32' ] @@ -294,8 +289,7 @@ describe('Connection Manager', () => { libp2p = await createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: max, - minConnections: 0 + maxConnections: max } }), started: false @@ -327,8 +321,7 @@ describe('Connection Manager', () => { await expect(createNode({ config: createBaseOptions({ connectionManager: { - maxConnections: 5, - minConnections: 6 + maxConnections: -1 } }), started: false diff --git a/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts new file mode 100644 index 0000000000..d952587466 --- /dev/null +++ b/packages/libp2p/test/connection-manager/reconnect-queue.spec.ts @@ -0,0 +1,117 @@ +/* eslint-env mocha */ + +import { KEEP_ALIVE, TypedEventEmitter, start, stop } from '@libp2p/interface' +import { peerLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import delay from 'delay' +import pRetry from 'p-retry' +import sinon from 'sinon' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { ReconnectQueue } from '../../src/connection-manager/reconnect-queue.js' +import type { ComponentLogger, Libp2pEvents, PeerStore, TypedEventTarget, Peer } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +describe('reconnect queue', () => { + let components: { + connectionManager: StubbedInstance + events: TypedEventTarget + peerStore: StubbedInstance + logger: ComponentLogger + } + let queue: ReconnectQueue + + beforeEach(async () => { + const peerId = await createEd25519PeerId() + + components = { + connectionManager: stubInterface(), + events: new TypedEventEmitter(), + peerStore: stubInterface(), + logger: peerLogger(peerId) + } + }) + + afterEach(async () => { + await stop(queue) + + sinon.reset() + }) + + it('should reconnect to KEEP_ALIVE peers on startup', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([ + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ]) + + await start(queue) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should reconnect to KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const keepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(keepAlivePeer).resolves( + stubInterface({ + id: keepAlivePeer, + tags: new Map([[KEEP_ALIVE, { + value: 1 + }]]) + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: keepAlivePeer + })) + + await pRetry(() => { + expect(components.connectionManager.openConnection.calledWith(keepAlivePeer)).to.be.true() + }, { + retries: 5, + factor: 1 + }) + }) + + it('should not reconnect to non-KEEP_ALIVE peers on disconnect', async () => { + queue = new ReconnectQueue(components) + + const nonKeepAlivePeer = await createEd25519PeerId() + + components.peerStore.all.resolves([]) + components.peerStore.get.withArgs(nonKeepAlivePeer).resolves( + stubInterface({ + id: nonKeepAlivePeer, + tags: new Map() + }) + ) + + await start(queue) + + components.events.safeDispatchEvent('peer:disconnect', new CustomEvent('peer:disconnect', { + detail: nonKeepAlivePeer + })) + + await delay(1000) + + expect(components.connectionManager.openConnection.calledWith(nonKeepAlivePeer)).to.be.false() + }) +}) diff --git a/packages/libp2p/test/registrar/errors.spec.ts b/packages/libp2p/test/registrar/errors.spec.ts index ff78d2097d..4ed601ab81 100644 --- a/packages/libp2p/test/registrar/errors.spec.ts +++ b/packages/libp2p/test/registrar/errors.spec.ts @@ -31,7 +31,6 @@ describe('registrar errors', () => { }) components.peerStore = new PersistentPeerStore(components) components.connectionManager = new DefaultConnectionManager(components, { - minConnections: 50, maxConnections: 1000, inboundUpgradeTimeout: 1000 }) diff --git a/packages/peer-discovery-bootstrap/package.json b/packages/peer-discovery-bootstrap/package.json index 5405395a1d..c1607833e3 100644 --- a/packages/peer-discovery-bootstrap/package.json +++ b/packages/peer-discovery-bootstrap/package.json @@ -55,6 +55,7 @@ }, "dependencies": { "@libp2p/interface": "^1.6.3", + "@libp2p/interface-internal": "^1.3.3", "@libp2p/peer-id": "^4.2.3", "@multiformats/mafmt": "^12.1.6", "@multiformats/multiaddr": "^12.2.3" diff --git a/packages/peer-discovery-bootstrap/src/index.ts b/packages/peer-discovery-bootstrap/src/index.ts index eb09310505..140a2078cd 100644 --- a/packages/peer-discovery-bootstrap/src/index.ts +++ b/packages/peer-discovery-bootstrap/src/index.ts @@ -37,10 +37,10 @@ import { peerIdFromString } from '@libp2p/peer-id' import { P2P } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import type { ComponentLogger, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerInfo, PeerStore, Startable } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' const DEFAULT_BOOTSTRAP_TAG_NAME = 'bootstrap' const DEFAULT_BOOTSTRAP_TAG_VALUE = 50 -const DEFAULT_BOOTSTRAP_TAG_TTL = 120000 const DEFAULT_BOOTSTRAP_DISCOVERY_TIMEOUT = 1000 export interface BootstrapInit { @@ -55,7 +55,9 @@ export interface BootstrapInit { timeout?: number /** - * Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap') + * Tag a bootstrap peer with this name before "discovering" it + * + * @default 'bootstrap' */ tagName?: string @@ -65,7 +67,7 @@ export interface BootstrapInit { tagValue?: number /** - * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) + * Cause the bootstrap peer tag to be removed after this number of ms */ tagTTL?: number } @@ -73,6 +75,7 @@ export interface BootstrapInit { export interface BootstrapComponents { peerStore: PeerStore logger: ComponentLogger + connectionManager: ConnectionManager } /** @@ -166,9 +169,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi tags: { [this._init.tagName ?? DEFAULT_BOOTSTRAP_TAG_NAME]: { value: this._init.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, - ttl: this._init.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL + ttl: this._init.tagTTL } - } + }, + multiaddrs: peerData.multiaddrs }) // check we are still running @@ -177,6 +181,10 @@ class Bootstrap extends TypedEventEmitter implements PeerDi } this.safeDispatchEvent('peer', { detail: peerData }) + this.components.connectionManager.openConnection(peerData.id) + .catch(err => { + this.log.error('could not dial bootstrap peer %p', peerData.id, err) + }) } } diff --git a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts index 8549f71012..7429053fb4 100644 --- a/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts +++ b/packages/peer-discovery-bootstrap/test/bootstrap.spec.ts @@ -7,13 +7,20 @@ import { IPFS } from '@multiformats/mafmt' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { bootstrap, type BootstrapComponents } from '../src/index.js' +import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import partialValidPeerList from './fixtures/some-invalid-peers.js' -import type { PeerStore } from '@libp2p/interface' +import type { ComponentLogger, PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +export interface StubbedBootstrapComponents { + peerStore: PeerStore + logger: ComponentLogger + connectionManager: StubbedInstance +} describe('bootstrap', () => { - let components: BootstrapComponents + let components: StubbedBootstrapComponents let peerStore: StubbedInstance beforeEach(async () => { @@ -21,7 +28,8 @@ describe('bootstrap', () => { components = { peerStore, - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } }) @@ -49,6 +57,27 @@ describe('bootstrap', () => { await stop(r) }) + it('should dial bootstrap peers', async function () { + this.timeout(5 * 1000) + const r = bootstrap({ + list: peerList, + timeout: 100 + })(components) + + await start(r) + + await new Promise(resolve => { + const interval = setInterval(() => { + if (components.connectionManager.openConnection.callCount === 1) { + clearInterval(interval) + resolve() + } + }, 100) + }) + + await stop(r) + }) + it('should tag bootstrap peers', async function () { this.timeout(5 * 1000) @@ -92,7 +121,10 @@ describe('bootstrap', () => { value: tagValue, ttl: tagTTL } - } + }, + multiaddrs: [ + bootstrapper0ma + ] }) await stop(r) diff --git a/packages/peer-discovery-bootstrap/test/compliance.spec.ts b/packages/peer-discovery-bootstrap/test/compliance.spec.ts index 9a5a46cb95..78c80419df 100644 --- a/packages/peer-discovery-bootstrap/test/compliance.spec.ts +++ b/packages/peer-discovery-bootstrap/test/compliance.spec.ts @@ -6,13 +6,15 @@ import { stubInterface } from 'sinon-ts' import { bootstrap } from '../src/index.js' import peerList from './fixtures/default-peers.js' import type { PeerStore } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' describe('compliance tests', () => { tests({ async setup () { const components = { peerStore: stubInterface(), - logger: defaultLogger() + logger: defaultLogger(), + connectionManager: stubInterface() } return bootstrap({ diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index e95fcc9e5e..db2ccf46cb 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -35,7 +35,6 @@ export default { }) }, connectionManager: { - minConnections: 0, inboundConnectionThreshold: Infinity } }) diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index e8df7d08ba..22b3667e8b 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -44,9 +44,6 @@ async function createNode (): Promise { connectionGater: { denyDialMultiaddr: () => false }, - connectionManager: { - minConnections: 0 - }, services: { identify: identify() } diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index b2c7e10691..7af761bed3 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -21,9 +21,6 @@ describe('libp2p-webtransport', () => { connectionGater: { denyDialMultiaddr: async () => false }, - connectionManager: { - minConnections: 0 - }, services: { ping: ping() }