diff --git a/packages/kad-dht/package.json b/packages/kad-dht/package.json index 0dda89d551..44e98b9636 100644 --- a/packages/kad-dht/package.json +++ b/packages/kad-dht/package.json @@ -109,7 +109,6 @@ "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", "p-retry": "^6.2.0", - "p-wait-for": "^5.0.2", "protons": "^7.5.0", "sinon": "^18.0.0", "sinon-ts": "^2.0.0", diff --git a/packages/kad-dht/src/index.ts b/packages/kad-dht/src/index.ts index 8defa43c40..65ef605b0b 100644 --- a/packages/kad-dht/src/index.ts +++ b/packages/kad-dht/src/index.ts @@ -400,7 +400,7 @@ export interface KadDHTInit { * Settings for how long to wait in ms when pinging DHT peers to decide if * they should be evicted from the routing table or not. */ - pingTimeout?: Omit + pingOldContactTimeout?: Omit /** * How many peers to ping in parallel when deciding if they should @@ -408,7 +408,35 @@ export interface KadDHTInit { * * @default 10 */ - pingConcurrency?: number + pingOldContactConcurrency?: number + + /** + * How long the queue to ping peers is allowed to grow + * + * @default 100 + */ + pingOldContactMaxQueueSize?: number + + /** + * Settings for how long to wait in ms when pinging DHT peers to decide if + * they should be added to the routing table or not. + */ + pingNewContactTimeout?: Omit + + /** + * How many peers to ping in parallel when deciding if they should be added to + * the routing table or not + * + * @default 10 + */ + pingNewContactConcurrency?: number + + /** + * How long the queue to ping peers is allowed to grow + * + * @default 100 + */ + pingNewContactMaxQueueSize?: number /** * How many parallel incoming streams to allow on the DHT protocol per diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index d42ff1b71d..fba14c7b67 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -138,8 +138,6 @@ export class KadDHT extends TypedEventEmitter implements Ka querySelfInterval, protocol, logPrefix, - pingTimeout, - pingConcurrency, maxInboundStreams, maxOutboundStreams, providers: providersInit @@ -156,15 +154,6 @@ export class KadDHT extends TypedEventEmitter implements Ka this.maxInboundStreams = maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS this.maxOutboundStreams = maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS this.peerInfoMapper = init.peerInfoMapper ?? removePrivateAddressesMapper - this.routingTable = new RoutingTable(components, { - kBucketSize, - pingTimeout, - pingConcurrency, - protocol: this.protocol, - logPrefix: loggingPrefix, - prefixLength: init.prefixLength, - splitThreshold: init.kBucketSplitThreshold - }) this.providers = new Providers(components, providersInit ?? {}) @@ -181,6 +170,21 @@ export class KadDHT extends TypedEventEmitter implements Ka logPrefix: loggingPrefix }) + this.routingTable = new RoutingTable(components, { + kBucketSize, + pingOldContactTimeout: init.pingOldContactTimeout, + pingOldContactConcurrency: init.pingOldContactConcurrency, + pingOldContactMaxQueueSize: init.pingOldContactMaxQueueSize, + pingNewContactTimeout: init.pingNewContactTimeout, + pingNewContactConcurrency: init.pingNewContactConcurrency, + pingNewContactMaxQueueSize: init.pingNewContactMaxQueueSize, + protocol: this.protocol, + logPrefix: loggingPrefix, + prefixLength: init.prefixLength, + splitThreshold: init.kBucketSplitThreshold, + network: this.network + }) + // all queries should wait for the initial query-self query to run so we have // some peers and don't force consumers to use arbitrary timeouts const initialQuerySelfHasRun = pDefer() @@ -376,11 +380,17 @@ export class KadDHT extends TypedEventEmitter implements Ka await this.components.registrar.unhandle(this.protocol) + // check again after async work + if (mode === this.getMode() && !force) { + this.log('already in %s mode', mode) + return + } + if (mode === 'client') { - this.log('enabling client mode') + this.log('enabling client mode while in %s mode', this.getMode()) this.clientMode = true } else { - this.log('enabling server mode') + this.log('enabling server mode while in %s mode', this.getMode()) this.clientMode = false await this.components.registrar.handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc), { maxInboundStreams: this.maxInboundStreams, @@ -399,14 +409,18 @@ export class KadDHT extends TypedEventEmitter implements Ka await this.setMode(this.clientMode ? 'client' : 'server', true) await start( - this.querySelf, + this.routingTable, this.providers, this.queryManager, this.network, - this.routingTable, this.topologyListener, this.routingTableRefresh ) + + // Query self after other components are configured + await start( + this.querySelf + ) } /** diff --git a/packages/kad-dht/src/network.ts b/packages/kad-dht/src/network.ts index e786c1ef1b..4c8d7278ba 100644 --- a/packages/kad-dht/src/network.ts +++ b/packages/kad-dht/src/network.ts @@ -85,7 +85,7 @@ export class Network extends TypedEventEmitter implements Startab } /** - * Send a request and record RTT for latency measurements + * Send a request and read a response */ async * sendRequest (to: PeerId, msg: Partial, options: RoutingOptions = {}): AsyncGenerator { if (!this.running) { @@ -204,7 +204,6 @@ export class Network extends TypedEventEmitter implements Startab async _writeMessage (stream: Stream, msg: Partial, options: AbortOptions): Promise { const pb = pbStream(stream) await pb.write(msg, Message, options) - await pb.unwrap().close(options) } /** @@ -219,8 +218,6 @@ export class Network extends TypedEventEmitter implements Startab const message = await pb.read(Message, options) - await pb.unwrap().close(options) - // tell any listeners about new peers we've seen message.closer.forEach(peerData => { this.safeDispatchEvent('peer', { diff --git a/packages/kad-dht/src/query-self.ts b/packages/kad-dht/src/query-self.ts index b264f2d9e3..c32a2d0a19 100644 --- a/packages/kad-dht/src/query-self.ts +++ b/packages/kad-dht/src/query-self.ts @@ -106,19 +106,27 @@ export class QuerySelf implements Startable { if (this.started) { this.controller = new AbortController() - const timeoutSignal = AbortSignal.timeout(this.queryTimeout) - const signal = anySignal([this.controller.signal, timeoutSignal]) + const signals = [this.controller.signal] - // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged - setMaxListeners(Infinity, signal, this.controller.signal, timeoutSignal) + // add a shorter timeout if we've already run our initial self query + if (this.initialQuerySelfHasRun == null) { + const timeoutSignal = AbortSignal.timeout(this.queryTimeout) + setMaxListeners(Infinity, timeoutSignal) + signals.push(timeoutSignal) + } + + const signal = anySignal(signals) + setMaxListeners(Infinity, signal, this.controller.signal) try { if (this.routingTable.size === 0) { this.log('routing table was empty, waiting for some peers before running query') - // wait to discover at least one DHT peer + // wait to discover at least one DHT peer that isn't us await pEvent(this.routingTable, 'peer:add', { - signal + signal, + filter: (event) => !this.peerId.equals(event.detail) }) + this.log('routing table has peers, continuing with query') } this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout) diff --git a/packages/kad-dht/src/routing-table/index.ts b/packages/kad-dht/src/routing-table/index.ts index 3156701392..5295dc394b 100644 --- a/packages/kad-dht/src/routing-table/index.ts +++ b/packages/kad-dht/src/routing-table/index.ts @@ -1,21 +1,33 @@ -import { InvalidMessageError, KEEP_ALIVE, TypedEventEmitter } from '@libp2p/interface' -import { PeerSet } from '@libp2p/peer-collections' +import { KEEP_ALIVE, TypedEventEmitter, setMaxListeners } from '@libp2p/interface' import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout' import { PeerQueue } from '@libp2p/utils/peer-queue' -import { pbStream } from 'it-protobuf-stream' -import { Message, MessageType } from '../message/dht.js' +import { anySignal } from 'any-signal' +import parallel from 'it-parallel' +import { EventTypes } from '../index.js' +import { MessageType } from '../message/dht.js' import * as utils from '../utils.js' -import { KBucket, isLeafBucket, type Bucket, type PingEventDetails } from './k-bucket.js' -import type { ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface' -import type { ConnectionManager } from '@libp2p/interface-internal' +import { KBucket, isLeafBucket } from './k-bucket.js' +import type { Bucket, LeafBucket, Peer } from './k-bucket.js' +import type { Network } from '../network.js' +import type { AbortOptions, ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream, TagOptions } from '@libp2p/interface' import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout' export const KAD_CLOSE_TAG_NAME = 'kad-close' export const KAD_CLOSE_TAG_VALUE = 50 export const KBUCKET_SIZE = 20 -export const PREFIX_LENGTH = 32 -export const PING_TIMEOUT = 2000 -export const PING_CONCURRENCY = 20 +export const PREFIX_LENGTH = 7 +export const PING_NEW_CONTACT_TIMEOUT = 2000 +export const PING_NEW_CONTACT_CONCURRENCY = 20 +export const PING_NEW_CONTACT_MAX_QUEUE_SIZE = 100 +export const PING_OLD_CONTACT_COUNT = 3 +export const PING_OLD_CONTACT_TIMEOUT = 2000 +export const PING_OLD_CONTACT_CONCURRENCY = 20 +export const PING_OLD_CONTACT_MAX_QUEUE_SIZE = 100 +export const KAD_PEER_TAG_NAME = 'kad-peer' +export const KAD_PEER_TAG_VALUE = 1 +export const LAST_PING_THRESHOLD = 600000 +export const POPULATE_FROM_DATASTORE_ON_START = true +export const POPULATE_FROM_DATASTORE_LIMIT = 1000 export interface RoutingTableInit { logPrefix: string @@ -23,16 +35,26 @@ export interface RoutingTableInit { prefixLength?: number splitThreshold?: number kBucketSize?: number - pingTimeout?: AdaptiveTimeoutInit - pingConcurrency?: number - tagName?: string - tagValue?: number + pingNewContactTimeout?: AdaptiveTimeoutInit + pingNewContactConcurrency?: number + pingNewContactMaxQueueSize?: number + pingOldContactTimeout?: AdaptiveTimeoutInit + pingOldContactConcurrency?: number + pingOldContactMaxQueueSize?: number + numberOfOldContactsToPing?: number + peerTagName?: string + peerTagValue?: number + closeTagName?: string + closeTagValue?: number + network: Network + populateFromDatastoreOnStart?: boolean + populateFromDatastoreLimit?: number + lastPingThreshold?: number } export interface RoutingTableComponents { peerId: PeerId peerStore: PeerStore - connectionManager: ConnectionManager metrics?: Metrics logger: ComponentLogger } @@ -43,30 +65,34 @@ export interface RoutingTableEvents { } /** - * A wrapper around `k-bucket`, to provide easy store and - * retrieval for peers. + * A wrapper around `k-bucket`, to provide easy store and retrieval for peers. */ export class RoutingTable extends TypedEventEmitter implements Startable { public kBucketSize: number - public kb?: KBucket - public pingQueue: PeerQueue - + public kb: KBucket + public network: Network private readonly log: Logger private readonly components: RoutingTableComponents - private readonly prefixLength: number - private readonly splitThreshold: number - private readonly pingTimeout: AdaptiveTimeout - private readonly pingConcurrency: number private running: boolean + private readonly pingNewContactTimeout: AdaptiveTimeout + private readonly pingNewContactQueue: PeerQueue + private readonly pingOldContactTimeout: AdaptiveTimeout + private readonly pingOldContactQueue: PeerQueue + private readonly populateFromDatastoreOnStart: boolean + private readonly populateFromDatastoreLimit: number private readonly protocol: string - private readonly tagName: string - private readonly tagValue: number + private readonly peerTagName: string + private readonly peerTagValue: number + private readonly closeTagName: string + private readonly closeTagValue: number private readonly metrics?: { routingTableSize: Metric routingTableKadBucketTotal: Metric routingTableKadBucketAverageOccupancy: Metric routingTableKadBucketMaxDepth: Metric - kadBucketEvents: CounterGroup<'ping' | 'ping_error' | 'peer_added' | 'peer_removed'> + routingTableKadBucketMinOccupancy: Metric + routingTableKadBucketMaxOccupancy: Metric + kadBucketEvents: CounterGroup<'ping_old_contact' | 'ping_old_contact_error' | 'ping_new_contact' | 'ping_new_contact_error' | 'peer_added' | 'peer_removed'> } constructor (components: RoutingTableComponents, init: RoutingTableInit) { @@ -75,26 +101,56 @@ export class RoutingTable extends TypedEventEmitter implemen this.components = components this.log = components.logger.forComponent(`${init.logPrefix}:routing-table`) this.kBucketSize = init.kBucketSize ?? KBUCKET_SIZE - this.pingConcurrency = init.pingConcurrency ?? PING_CONCURRENCY this.running = false this.protocol = init.protocol - this.tagName = init.tagName ?? KAD_CLOSE_TAG_NAME - this.tagValue = init.tagValue ?? KAD_CLOSE_TAG_VALUE - this.prefixLength = init.prefixLength ?? PREFIX_LENGTH - this.splitThreshold = init.splitThreshold ?? KBUCKET_SIZE - - this.pingQueue = new PeerQueue({ - concurrency: this.pingConcurrency, - metricName: `${init.logPrefix.replaceAll(':', '_')}_ping_queue`, - metrics: this.components.metrics + this.network = init.network + this.peerTagName = init.peerTagName ?? KAD_PEER_TAG_NAME + this.peerTagValue = init.peerTagValue ?? KAD_PEER_TAG_VALUE + this.closeTagName = init.closeTagName ?? KAD_CLOSE_TAG_NAME + this.closeTagValue = init.closeTagValue ?? KAD_CLOSE_TAG_VALUE + this.pingOldContacts = this.pingOldContacts.bind(this) + this.verifyNewContact = this.verifyNewContact.bind(this) + this.peerAdded = this.peerAdded.bind(this) + this.peerRemoved = this.peerRemoved.bind(this) + this.peerMoved = this.peerMoved.bind(this) + this.populateFromDatastoreOnStart = init.populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START + this.populateFromDatastoreLimit = init.populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT + + this.pingOldContactQueue = new PeerQueue({ + concurrency: init.pingOldContactConcurrency ?? PING_OLD_CONTACT_CONCURRENCY, + metricName: `${init.logPrefix.replaceAll(':', '_')}_ping_old_contact_queue`, + metrics: this.components.metrics, + maxSize: init.pingOldContactMaxQueueSize ?? PING_OLD_CONTACT_MAX_QUEUE_SIZE }) - this.pingQueue.addEventListener('error', evt => { - this.log.error('error pinging peer', evt.detail) + this.pingOldContactTimeout = new AdaptiveTimeout({ + ...(init.pingOldContactTimeout ?? {}), + metrics: this.components.metrics, + metricName: `${init.logPrefix.replaceAll(':', '_')}_routing_table_ping_old_contact_time_milliseconds` }) - this.pingTimeout = new AdaptiveTimeout({ - ...(init.pingTimeout ?? {}), + + this.pingNewContactQueue = new PeerQueue({ + concurrency: init.pingNewContactConcurrency ?? PING_NEW_CONTACT_CONCURRENCY, + metricName: `${init.logPrefix.replaceAll(':', '_')}_ping_new_contact_queue`, metrics: this.components.metrics, - metricName: `${init.logPrefix.replaceAll(':', '_')}_routing_table_ping_time_milliseconds` + maxSize: init.pingNewContactMaxQueueSize ?? PING_NEW_CONTACT_MAX_QUEUE_SIZE + }) + this.pingNewContactTimeout = new AdaptiveTimeout({ + ...(init.pingNewContactTimeout ?? {}), + metrics: this.components.metrics, + metricName: `${init.logPrefix.replaceAll(':', '_')}_routing_table_ping_new_contact_time_milliseconds` + }) + + this.kb = new KBucket({ + kBucketSize: init.kBucketSize, + prefixLength: init.prefixLength, + splitThreshold: init.splitThreshold, + numberOfOldContactsToPing: init.numberOfOldContactsToPing, + lastPingThreshold: init.lastPingThreshold, + ping: this.pingOldContacts, + verify: this.verifyNewContact, + onAdd: this.peerAdded, + onRemove: this.peerRemoved, + onMove: this.peerMoved }) if (this.components.metrics != null) { @@ -102,6 +158,8 @@ export class RoutingTable extends TypedEventEmitter implemen routingTableSize: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_size`), routingTableKadBucketTotal: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_total`), routingTableKadBucketAverageOccupancy: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_average_occupancy`), + routingTableKadBucketMinOccupancy: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_min_occupancy`), + routingTableKadBucketMaxOccupancy: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_max_occupancy`), routingTableKadBucketMaxDepth: this.components.metrics.registerMetric(`${init.logPrefix.replaceAll(':', '_')}_routing_table_kad_bucket_max_depth`), kadBucketEvents: this.components.metrics.registerCounterGroup(`${init.logPrefix.replaceAll(':', '_')}_kad_bucket_events_total`) } @@ -115,109 +173,123 @@ export class RoutingTable extends TypedEventEmitter implemen async start (): Promise { this.running = true - const kBuck = new KBucket({ - localPeer: { - kadId: await utils.convertPeerId(this.components.peerId), - peerId: this.components.peerId - }, - kBucketSize: this.kBucketSize, - prefixLength: this.prefixLength, - splitThreshold: this.splitThreshold, - numberOfNodesToPing: 1 - }) - this.kb = kBuck - - // test whether to evict peers - kBuck.addEventListener('ping', (evt) => { - this.metrics?.kadBucketEvents.increment({ ping: true }) - - this._onPing(evt).catch(err => { - this.metrics?.kadBucketEvents.increment({ ping_error: true }) - this.log.error('could not process k-bucket ping event', err) - }) - }) + await this.kb.addSelfPeer(this.components.peerId) + } - let peerStorePeers = 0 + async afterStart (): Promise { + // do this async to not block startup but iterate serially to not overwhelm + // the ping queue + Promise.resolve().then(async () => { + if (!this.populateFromDatastoreOnStart) { + return + } - // add existing peers from the peer store to routing table - for (const peer of await this.components.peerStore.all()) { - if (peer.protocols.includes(this.protocol)) { - const id = await utils.convertPeerId(peer.id) + let peerStorePeers = 0 + + // add existing peers from the peer store to routing table + for (const peer of await this.components.peerStore.all({ + filters: [(peer) => { + return peer.protocols.includes(this.protocol) && peer.tags.has(KAD_PEER_TAG_NAME) + }], + limit: this.populateFromDatastoreLimit + })) { + if (!this.running) { + // bail if we've been shut down + return + } - this.kb.add({ kadId: id, peerId: peer.id }) - peerStorePeers++ + try { + await this.add(peer.id) + peerStorePeers++ + } catch (err) { + this.log('failed to add peer %p to routing table, removing kad-dht peer tags - %e') + await this.components.peerStore.merge(peer.id, { + tags: { + [this.closeTagName]: undefined, + [this.peerTagName]: undefined, + [KEEP_ALIVE]: undefined + } + }) + } } - } - this.log('added %d peer store peers to the routing table', peerStorePeers) - - // tag kad-close peers - this._tagPeers(kBuck) + this.log('added %d peer store peers to the routing table', peerStorePeers) + }) + .catch(err => { + this.log.error('error adding peer store peers to the routing table %e', err) + }) } async stop (): Promise { this.running = false - this.pingQueue.clear() - this.kb = undefined + this.pingOldContactQueue.abort() + this.pingNewContactQueue.abort() } - /** - * Keep track of our k-closest peers and tag them in the peer store as such - * - this will lower the chances that connections to them get closed when - * we reach connection limits - */ - _tagPeers (kBuck: KBucket): void { - let kClosest = new PeerSet() - - const updatePeerTags = utils.debounce(() => { - const newClosest = new PeerSet( - kBuck.closest(kBuck.localPeer.kadId, KBUCKET_SIZE) - ) - const addedPeers = newClosest.difference(kClosest) - const removedPeers = kClosest.difference(newClosest) - - Promise.resolve() - .then(async () => { - for (const peer of addedPeers) { - await this.components.peerStore.merge(peer, { - tags: { - [this.tagName]: { - value: this.tagValue - }, - [KEEP_ALIVE]: { - value: 1 - } - } - }) - } + private async peerAdded (peer: Peer, bucket: LeafBucket): Promise { + if (!this.components.peerId.equals(peer.peerId)) { + const tags: Record = { + [this.peerTagName]: { + value: this.peerTagValue + } + } - for (const peer of removedPeers) { - await this.components.peerStore.merge(peer, { - tags: { - [this.tagName]: undefined, - [KEEP_ALIVE]: undefined - } - }) - } - }) - .catch(err => { - this.log.error('Could not update peer tags', err) - }) + if (bucket.containsSelf === true) { + tags[this.closeTagName] = { + value: this.closeTagValue + } + tags[KEEP_ALIVE] = { + value: 1 + } + } - kClosest = newClosest - }) + await this.components.peerStore.merge(peer.peerId, { + tags + }) + } + + this.updateMetrics() + this.metrics?.kadBucketEvents.increment({ peer_added: true }) + this.safeDispatchEvent('peer:add', { detail: peer.peerId }) + } - kBuck.addEventListener('added', (evt) => { - updatePeerTags() + private async peerRemoved (peer: Peer, bucket: LeafBucket): Promise { + if (!this.components.peerId.equals(peer.peerId)) { + await this.components.peerStore.merge(peer.peerId, { + tags: { + [this.closeTagName]: undefined, + [this.peerTagName]: undefined, + [KEEP_ALIVE]: undefined + } + }) + } - this.metrics?.kadBucketEvents.increment({ peer_added: true }) - this.safeDispatchEvent('peer:add', { detail: evt.detail.peerId }) - }) - kBuck.addEventListener('removed', (evt) => { - updatePeerTags() + this.updateMetrics() + this.metrics?.kadBucketEvents.increment({ peer_removed: true }) + this.safeDispatchEvent('peer:remove', { detail: peer.peerId }) + } - this.metrics?.kadBucketEvents.increment({ peer_removed: true }) - this.safeDispatchEvent('peer:remove', { detail: evt.detail.peerId }) + private async peerMoved (peer: Peer, oldBucket: LeafBucket, newBucket: LeafBucket): Promise { + if (this.components.peerId.equals(peer.peerId)) { + return + } + + const tags: Record = { + [this.closeTagName]: undefined, + [KEEP_ALIVE]: undefined + } + + if (newBucket.containsSelf === true) { + tags[this.closeTagName] = { + value: this.closeTagValue + } + tags[KEEP_ALIVE] = { + value: 1 + } + } + + await this.components.peerStore.merge(peer.peerId, { + tags }) } @@ -231,87 +303,127 @@ export class RoutingTable extends TypedEventEmitter implemen * `oldContacts` will not be empty and is the list of contacts that * have not been contacted for the longest. */ - async _onPing (evt: CustomEvent): Promise { + async * pingOldContacts (oldContacts: Peer[], options?: AbortOptions): AsyncGenerator { if (!this.running) { return } - const { - oldContacts, - newContact - } = evt.detail + const jobs: Array<() => Promise> = [] - const results = await Promise.all( - oldContacts.map(async oldContact => { - // if a previous ping wants us to ping this contact, re-use the result - const pingJob = this.pingQueue.find(oldContact.peerId) - - if (pingJob != null) { - return pingJob.join() - } + for (const oldContact of oldContacts) { + if (this.kb.get(oldContact.kadId) == null) { + this.log('asked to ping contact %p that was not in routing table', oldContact.peerId) + continue + } - return this.pingQueue.add(async () => { - let stream: Stream | undefined - const signal = this.pingTimeout.getTimeoutSignal() + this.metrics?.kadBucketEvents.increment({ ping_old_contact: true }) - try { - const options = { - signal - } + jobs.push(async () => { + // if a previous ping wants us to ping this contact, re-use the result + const existingJob = this.pingOldContactQueue.find(oldContact.peerId) - this.log('pinging old contact %p', oldContact.peerId) - const connection = await this.components.connectionManager.openConnection(oldContact.peerId, options) - stream = await connection.newStream(this.protocol, options) + if (existingJob != null) { + this.log('asked to ping contact %p was already being pinged', oldContact.peerId) + const result = await existingJob.join(options) - const pb = pbStream(stream).pb(Message) - await pb.write({ - type: MessageType.PING, - closer: [], - providers: [] - }, options) - const response = await pb.read(options) + if (!result) { + return oldContact + } - await pb.unwrap().unwrap().close(options) + return + } - if (response.type !== MessageType.PING) { - throw new InvalidMessageError(`Incorrect message type received, expected PING got ${response.type}`) - } + const result = await this.pingOldContactQueue.add(async (options) => { + const signal = this.pingOldContactTimeout.getTimeoutSignal() + const signals = anySignal([signal, options?.signal]) + setMaxListeners(Infinity, signal, signals) - this.log('old contact %p ping ok', oldContact.peerId) + try { + return await this.pingContact(oldContact, options) + } catch { + this.metrics?.kadBucketEvents.increment({ ping_old_contact_error: true }) return true - } catch (err: any) { - if (this.running) { - // only evict peers if we are still running, otherwise we evict - // when dialing is cancelled due to shutdown in progress - this.log.error('could not ping peer %p - %e', oldContact.peerId, err) - this.log('evicting old contact after ping failed %p', oldContact.peerId) - this.kb?.remove(oldContact.kadId) - } - - stream?.abort(err) - - return false } finally { - this.pingTimeout.cleanUp(signal) - this.updateMetrics() + this.pingOldContactTimeout.cleanUp(signal) + signals.clear() } }, { - peerId: oldContact.peerId + peerId: oldContact.peerId, + signal: options?.signal }) + + if (!result) { + return oldContact + } }) - ) + } - const responded = results - .filter(res => res) - .length + for await (const peer of parallel(jobs)) { + if (peer != null) { + yield peer + } + } + } - if (this.running && responded < oldContacts.length && this.kb != null) { - this.log('adding new contact %p', newContact.peerId) - this.kb.add(newContact) + async verifyNewContact (contact: Peer, options?: AbortOptions): Promise { + const signal = this.pingNewContactTimeout.getTimeoutSignal() + const signals = anySignal([signal, options?.signal]) + setMaxListeners(Infinity, signal, signals) + + try { + const job = this.pingNewContactQueue.find(contact.peerId) + + if (job != null) { + this.log('joining existing ping to add new peer %p to routing table', contact.peerId) + return await job.join({ + signal: signals + }) + } else { + return await this.pingNewContactQueue.add(async (options) => { + this.metrics?.kadBucketEvents.increment({ ping_new_contact: true }) + + this.log('pinging new peer %p before adding to routing table', contact.peerId) + return this.pingContact(contact, options) + }, { + peerId: contact.peerId, + signal: signals + }) + } + } catch (err) { + this.log.trace('tried to add peer %p but they were not online', contact.peerId) + this.metrics?.kadBucketEvents.increment({ ping_new_contact_error: true }) + + return false + } finally { + this.pingNewContactTimeout.cleanUp(signal) + signals.clear() } } - // -- Public Interface + async pingContact (contact: Peer, options?: AbortOptions): Promise { + let stream: Stream | undefined + + try { + this.log('pinging contact %p', contact.peerId) + + for await (const event of this.network.sendRequest(contact.peerId, { type: MessageType.PING }, options)) { + if (event.type === EventTypes.PEER_RESPONSE) { + if (event.messageType === MessageType.PING) { + this.log('contact %p ping ok', contact.peerId) + return true + } + + return false + } + } + + return false + } catch (err: any) { + this.log('error pinging old contact %p - %e', contact.peerId, err) + stream?.abort(err) + return false + } + } /** * Amount of currently stored peers @@ -328,8 +440,8 @@ export class RoutingTable extends TypedEventEmitter implemen * Find a specific peer by id */ async find (peer: PeerId): Promise { - const key = await utils.convertPeerId(peer) - return this.kb?.get(key)?.peerId + const kadId = await utils.convertPeerId(peer) + return this.kb.get(kadId)?.peerId } /** @@ -359,18 +471,12 @@ export class RoutingTable extends TypedEventEmitter implemen /** * Add or update the routing table with the given peer */ - async add (peerId: PeerId): Promise { + async add (peerId: PeerId, options?: AbortOptions): Promise { if (this.kb == null) { throw new Error('RoutingTable is not started') } - const kadId = await utils.convertPeerId(peerId) - - this.kb.add({ kadId, peerId }) - - this.log.trace('added %p with kad id %b', peerId, kadId) - - this.updateMetrics() + await this.kb.add(peerId, options) } /** @@ -381,11 +487,9 @@ export class RoutingTable extends TypedEventEmitter implemen throw new Error('RoutingTable is not started') } - const id = await utils.convertPeerId(peer) - - this.kb.remove(id) + const kadId = await utils.convertPeerId(peer) - this.updateMetrics() + await this.kb.remove(kadId) } private updateMetrics (): void { @@ -396,6 +500,8 @@ export class RoutingTable extends TypedEventEmitter implemen let size = 0 let buckets = 0 let maxDepth = 0 + let minOccupancy = 20 + let maxOccupancy = 0 function count (bucket: Bucket): void { if (isLeafBucket(bucket)) { @@ -405,6 +511,15 @@ export class RoutingTable extends TypedEventEmitter implemen buckets++ size += bucket.peers.length + + if (bucket.peers.length < minOccupancy) { + minOccupancy = bucket.peers.length + } + + if (bucket.peers.length > maxOccupancy) { + maxOccupancy = bucket.peers.length + } + return } @@ -417,6 +532,8 @@ export class RoutingTable extends TypedEventEmitter implemen this.metrics.routingTableSize.update(size) this.metrics.routingTableKadBucketTotal.update(buckets) this.metrics.routingTableKadBucketAverageOccupancy.update(Math.round(size / buckets)) + this.metrics.routingTableKadBucketMinOccupancy.update(minOccupancy) + this.metrics.routingTableKadBucketMaxOccupancy.update(maxOccupancy) this.metrics.routingTableKadBucketMaxDepth.update(maxDepth) } } diff --git a/packages/kad-dht/src/routing-table/k-bucket.ts b/packages/kad-dht/src/routing-table/k-bucket.ts index cc79b24896..6f9bf67770 100644 --- a/packages/kad-dht/src/routing-table/k-bucket.ts +++ b/packages/kad-dht/src/routing-table/k-bucket.ts @@ -1,45 +1,49 @@ -import { TypedEventEmitter } from '@libp2p/interface' +import { PeerMap } from '@libp2p/peer-collections' import map from 'it-map' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { xor as uint8ArrayXor } from 'uint8arrays/xor' import { PeerDistanceList } from '../peer-list/peer-distance-list.js' -import { KBUCKET_SIZE } from './index.js' +import { convertPeerId } from '../utils.js' +import { KBUCKET_SIZE, LAST_PING_THRESHOLD, PING_OLD_CONTACT_COUNT, PREFIX_LENGTH } from './index.js' import type { PeerId } from '@libp2p/interface' +import type { AbortOptions } from 'it-protobuf-stream' -function arrayEquals (array1: Uint8Array, array2: Uint8Array): boolean { - if (array1 === array2) { - return true - } - if (array1.length !== array2.length) { - return false - } - for (let i = 0, length = array1.length; i < length; ++i) { - if (array1[i] !== array2[i]) { - return false - } - } - return true +export interface PingFunction { + /** + * Return either none or at least one contact that does not respond to a ping + * message + */ + (oldContacts: Peer[], options?: AbortOptions): AsyncGenerator } -function ensureInt8 (name: string, val?: Uint8Array): void { - if (!(val instanceof Uint8Array)) { - throw new TypeError(name + ' is not a Uint8Array') - } +/** + * Before a peer can be added to the table, verify that it is online and working + * correctly + */ +export interface VerifyFunction { + (contact: Peer, options?: AbortOptions): Promise +} - if (val.byteLength !== 32) { - throw new TypeError(name + ' had incorrect length') - } +export interface OnAddCallback { + /** + * Invoked when a new peer is added to the routing tables + */ + (peer: Peer, bucket: LeafBucket): Promise } -export interface PingEventDetails { - oldContacts: Peer[] - newContact: Peer +export interface OnRemoveCallback { + /** + * Invoked when a peer is evicted from the routing tables + */ + (peer: Peer, bucket: LeafBucket): Promise } -export interface KBucketEvents { - 'ping': CustomEvent - 'added': CustomEvent - 'removed': CustomEvent +export interface OnMoveCallback { + /** + * Invoked when a peer is moved between buckets in the routing tables + */ + (peer: Peer, oldBucket: LeafBucket, newBucket: LeafBucket): Promise } export interface KBucketOptions { @@ -47,14 +51,16 @@ export interface KBucketOptions { * The current peer. All subsequently added peers must have a KadID that is * the same length as this peer. */ - localPeer: Peer + // localPeer: Peer /** * How many bits of the key to use when forming the bucket trie. The larger * this value, the deeper the tree will grow and the slower the lookups will * be but the peers returned will be more specific to the key. + * + * @default 32 */ - prefixLength: number + prefixLength?: number /** * The number of nodes that a max-depth k-bucket can contain before being @@ -74,20 +80,37 @@ export interface KBucketOptions { /** * The number of nodes to ping when a bucket that should not be split becomes - * full. KBucket will emit a `ping` event that contains `numberOfNodesToPing` - * nodes that have not been contacted the longest. + * full. KBucket will emit a `ping` event that contains + * `numberOfOldContactsToPing` nodes that have not been contacted the longest. + * + * @default 3 */ - numberOfNodesToPing?: number + numberOfOldContactsToPing?: number + + /** + * Do not re-ping a peer during this time window in ms + * + * @default 600000 + */ + lastPingThreshold?: number + + ping: PingFunction + verify: VerifyFunction + onAdd?: OnAddCallback + onRemove?: OnRemoveCallback + onMove?: OnMoveCallback } export interface Peer { kadId: Uint8Array peerId: PeerId + lastPing: number } export interface LeafBucket { prefix: string depth: number + containsSelf?: boolean peers: Peer[] } @@ -108,24 +131,33 @@ export function isLeafBucket (obj: any): obj is LeafBucket { * Implementation of a Kademlia DHT routing table as a prefix binary trie with * configurable prefix length, bucket split threshold and size. */ -export class KBucket extends TypedEventEmitter { +export class KBucket { public root: Bucket - public localPeer: Peer + public localPeer?: Peer private readonly prefixLength: number private readonly splitThreshold: number private readonly kBucketSize: number private readonly numberOfNodesToPing: number + private readonly lastPingThreshold: number + public ping: PingFunction + public verify: VerifyFunction + private readonly onAdd?: OnAddCallback + private readonly onRemove?: OnRemoveCallback + private readonly onMove?: OnMoveCallback + private readonly addingPeerMap: PeerMap> constructor (options: KBucketOptions) { - super() - - this.localPeer = options.localPeer - this.prefixLength = options.prefixLength + this.prefixLength = options.prefixLength ?? PREFIX_LENGTH this.kBucketSize = options.kBucketSize ?? KBUCKET_SIZE this.splitThreshold = options.splitThreshold ?? this.kBucketSize - this.numberOfNodesToPing = options.numberOfNodesToPing ?? 3 - - ensureInt8('options.localPeer.kadId', options.localPeer.kadId) + this.numberOfNodesToPing = options.numberOfOldContactsToPing ?? PING_OLD_CONTACT_COUNT + this.lastPingThreshold = options.lastPingThreshold ?? LAST_PING_THRESHOLD + this.ping = options.ping + this.verify = options.verify + this.onAdd = options.onAdd + this.onRemove = options.onRemove + this.onMove = options.onMove + this.addingPeerMap = new PeerMap() this.root = { prefix: '', @@ -134,14 +166,43 @@ export class KBucket extends TypedEventEmitter { } } + async addSelfPeer (peerId: PeerId): Promise { + this.localPeer = { + peerId, + kadId: await convertPeerId(peerId), + lastPing: Date.now() + } + + const bucket = this._determineBucket(this.localPeer.kadId) + bucket.containsSelf = true + } + /** - * Adds a contact to the k-bucket. - * - * @param {Peer} peer - the contact object to add + * Adds a contact to the trie */ - add (peer: Peer): void { - ensureInt8('peer.kadId', peer?.kadId) + async add (peerId: PeerId, options?: AbortOptions): Promise { + const peer = { + peerId, + kadId: await convertPeerId(peerId), + lastPing: 0 + } + + const existingPromise = this.addingPeerMap.get(peerId) + + if (existingPromise != null) { + return existingPromise + } + try { + const p = this._add(peer, options) + this.addingPeerMap.set(peerId, p) + await p + } finally { + this.addingPeerMap.delete(peerId) + } + } + + private async _add (peer: Peer, options?: AbortOptions): Promise { const bucket = this._determineBucket(peer.kadId) // check if the contact already exists @@ -152,18 +213,32 @@ export class KBucket extends TypedEventEmitter { // are there too many peers in the bucket and can we make the trie deeper? if (bucket.peers.length === this.splitThreshold && bucket.depth < this.prefixLength) { // split the bucket - this._split(bucket) + await this._split(bucket) // try again - this.add(peer) + await this._add(peer, options) return } // is there space in the bucket? if (bucket.peers.length < this.kBucketSize) { - bucket.peers.push(peer) - this.safeDispatchEvent('added', { detail: peer }) + // we've ping this peer previously, just add them to the bucket + if (!needsPing(peer, this.lastPingThreshold)) { + bucket.peers.push(peer) + await this.onAdd?.(peer, bucket) + return + } + + const result = await this.verify(peer, options) + + // only add if peer is online and functioning correctly + if (result) { + peer.lastPing = Date.now() + + // try again - buckets may have changed during ping + await this._add(peer, options) + } return } @@ -171,17 +246,51 @@ export class KBucket extends TypedEventEmitter { // we are at the bottom of the trie and the bucket is full so we can't add // any more peers. // - // instead ping the first this.numberOfNodesToPing in order to determine + // instead ping the first `this.numberOfNodesToPing` in order to determine // if they are still online. // // only add the new peer if one of the pinged nodes does not respond, this // prevents DoS flooding with new invalid contacts. - this.safeDispatchEvent('ping', { - detail: { - oldContacts: bucket.peers.slice(0, this.numberOfNodesToPing), - newContact: peer - } - }) + const toPing = bucket.peers + .filter(peer => { + if (peer.peerId.equals(this.localPeer?.peerId)) { + return false + } + + if (peer.lastPing > (Date.now() - this.lastPingThreshold)) { + return false + } + + return true + }) + .sort((a, b) => { + // sort oldest ping -> newest + if (a.lastPing < b.lastPing) { + return -1 + } + + if (a.lastPing > b.lastPing) { + return 1 + } + + return 0 + }) + .slice(0, this.numberOfNodesToPing) + + let evicted = false + + for await (const toEvict of this.ping(toPing, options)) { + evicted = true + await this.remove(toEvict.kadId) + } + + // did not evict any peers, cannot add new contact + if (!evicted) { + return + } + + // try again - buckets may have changed during ping + await this._add(peer, options) } /** @@ -235,7 +344,7 @@ export class KBucket extends TypedEventEmitter { * which branch of the tree to traverse and repeat. * * @param {Uint8Array} kadId - The ID of the contact to fetch. - * @returns {object | undefined} The contact if available, otherwise null + * @returns {Peer | undefined} The contact if available, otherwise null */ get (kadId: Uint8Array): Peer | undefined { const bucket = this._determineBucket(kadId) @@ -249,15 +358,14 @@ export class KBucket extends TypedEventEmitter { * * @param {Uint8Array} kadId - The ID of the contact to remove */ - remove (kadId: Uint8Array): void { + async remove (kadId: Uint8Array): Promise { const bucket = this._determineBucket(kadId) const index = this._indexOf(bucket, kadId) if (index > -1) { const peer = bucket.peers.splice(index, 1)[0] - this.safeDispatchEvent('removed', { - detail: peer - }) + + await this.onRemove?.(peer, bucket) } } @@ -331,7 +439,7 @@ export class KBucket extends TypedEventEmitter { * @returns {number} Integer Index of contact with provided id if it exists, -1 otherwise. */ private _indexOf (bucket: LeafBucket, kadId: Uint8Array): number { - return bucket.peers.findIndex(peer => arrayEquals(peer.kadId, kadId)) + return bucket.peers.findIndex(peer => uint8ArrayEquals(peer.kadId, kadId)) } /** @@ -339,8 +447,8 @@ export class KBucket extends TypedEventEmitter { * * @param {any} bucket - bucket for splitting */ - private _split (bucket: LeafBucket): void { - const depth = bucket.depth + 1 + private async _split (bucket: LeafBucket): Promise { + const depth = bucket.prefix === '' ? bucket.depth : bucket.depth + 1 // create child buckets const left: LeafBucket = { @@ -354,23 +462,44 @@ export class KBucket extends TypedEventEmitter { peers: [] } + if (bucket.containsSelf === true && this.localPeer != null) { + delete bucket.containsSelf + const selfNodeBitString = uint8ArrayToString(this.localPeer.kadId, 'base2') + + if (selfNodeBitString[depth] === '0') { + left.containsSelf = true + } else { + right.containsSelf = true + } + } + // redistribute peers for (const peer of bucket.peers) { const bitString = uint8ArrayToString(peer.kadId, 'base2') if (bitString[depth] === '0') { left.peers.push(peer) + await this.onMove?.(peer, bucket, left) } else { right.peers.push(peer) + await this.onMove?.(peer, bucket, right) } } - // convert leaf bucket to internal bucket - // @ts-expect-error peers is not a property of LeafBucket - delete bucket.peers - // @ts-expect-error left is not a property of LeafBucket - bucket.left = left - // @ts-expect-error right is not a property of LeafBucket - bucket.right = right + // convert old leaf bucket to internal bucket + convertToInternalBucket(bucket, left, right) } } + +function convertToInternalBucket (bucket: any, left: any, right: any): bucket is InternalBucket { + delete bucket.peers + delete bucket.containsSelf + bucket.left = left + bucket.right = right + + return true +} + +function needsPing (peer: Peer, threshold: number): boolean { + return peer.lastPing < (Date.now() - threshold) +} diff --git a/packages/kad-dht/src/routing-table/refresh.ts b/packages/kad-dht/src/routing-table/refresh.ts index 717e9b3e2e..a59ac67af7 100644 --- a/packages/kad-dht/src/routing-table/refresh.ts +++ b/packages/kad-dht/src/routing-table/refresh.ts @@ -169,6 +169,10 @@ export class RoutingTableRefresh { throw new Error('Routing table not started') } + if (this.routingTable.kb.localPeer == null) { + throw new Error('Local peer not set') + } + const randomData = randomBytes(2) const randomUint16 = (randomData[1] << 8) + randomData[0] @@ -245,7 +249,7 @@ export class RoutingTableRefresh { * Yields the common prefix length of every peer in the table */ * _prefixLengths (): Generator { - if (this.routingTable.kb == null) { + if (this.routingTable.kb?.localPeer == null) { return } diff --git a/packages/kad-dht/src/rpc/index.ts b/packages/kad-dht/src/rpc/index.ts index 72b40201d7..9c119349e7 100644 --- a/packages/kad-dht/src/rpc/index.ts +++ b/packages/kad-dht/src/rpc/index.ts @@ -63,12 +63,6 @@ export class RPC { * Process incoming DHT messages */ async handleMessage (peerId: PeerId, msg: Message): Promise { - try { - await this.routingTable.add(peerId) - } catch (err: any) { - this.log.error('Failed to update the kbucket store', err) - } - // get handler & execute it const handler = this.handlers[msg.type] @@ -94,6 +88,8 @@ export class RPC { * Handle incoming streams on the dht protocol */ onIncomingStream (data: IncomingStreamData): void { + let message = 'unknown' + Promise.resolve().then(async () => { const { stream, connection } = data const peerId = connection.remotePeer @@ -113,6 +109,7 @@ export class RPC { for await (const msg of source) { // handle the message const desMessage = Message.decode(msg) + message = desMessage.type self.log('incoming %s from %p', desMessage.type, peerId) const res = await self.handleMessage(peerId, desMessage) @@ -127,7 +124,7 @@ export class RPC { ) }) .catch(err => { - this.log.error(err) + this.log.error('error handling %s RPC message from %p - %e', message, data.connection.remotePeer, err) }) } } diff --git a/packages/kad-dht/test/generate-peers/generate-peers.node.ts b/packages/kad-dht/test/generate-peers/generate-peers.node.ts index 8cc9354ae1..35902c4640 100644 --- a/packages/kad-dht/test/generate-peers/generate-peers.node.ts +++ b/packages/kad-dht/test/generate-peers/generate-peers.node.ts @@ -14,6 +14,7 @@ import { RoutingTableRefresh } from '../../src/routing-table/refresh.js' import { convertPeerId } from '../../src/utils.js' +import type { Network } from '../../src/network.js' import type { PeerStore } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal' @@ -64,7 +65,8 @@ describe.skip('generate peers', function () { const table = new RoutingTable(components, { kBucketSize: 20, logPrefix: '', - protocol: '/ipfs/kad/1.0.0' + protocol: '/ipfs/kad/1.0.0', + network: stubInterface() }) refresh = new RoutingTableRefresh({ logger: defaultLogger() diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index c427d97fd1..661f0d145e 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -779,8 +779,8 @@ describe('KadDHT', () => { // The expected closest kValue peers to the key const exp = actualClosest.slice(0, c.K) - // Expect the kValue peers found to include the kValue closest connected peers - // to the key + // Expect the kValue peers found to include the kValue closest connected + // peers to the key expect(countDiffPeers(out, exp)).to.equal(0) }) @@ -838,7 +838,7 @@ describe('KadDHT', () => { }) describe('errors', () => { - it('get should handle correctly an unexpected error', async function () { + it('get should correctly handle an unexpected error', async function () { this.timeout(240 * 1000) const error = new Error('fake error') diff --git a/packages/kad-dht/test/libp2p-routing.spec.ts b/packages/kad-dht/test/libp2p-routing.spec.ts index d2fc8b49df..d05cf7a299 100644 --- a/packages/kad-dht/test/libp2p-routing.spec.ts +++ b/packages/kad-dht/test/libp2p-routing.spec.ts @@ -115,6 +115,9 @@ describe('content routing', () => { allowQueryWithZeroPeers: true })(components) + // @ts-expect-error not part of public api + dht.routingTable.kb.verify = async () => true + await start(dht) // @ts-expect-error cannot use symbol to index KadDHT type @@ -247,6 +250,9 @@ describe('peer routing', () => { await start(dht) + // @ts-expect-error not part of public api + dht.routingTable.kb.verify = async () => true + // @ts-expect-error cannot use symbol to index KadDHT type peerRouting = dht[peerRoutingSymbol] }) diff --git a/packages/kad-dht/test/routing-table.spec.ts b/packages/kad-dht/test/routing-table.spec.ts index 34de652937..180e746580 100644 --- a/packages/kad-dht/test/routing-table.spec.ts +++ b/packages/kad-dht/test/routing-table.spec.ts @@ -2,51 +2,43 @@ import { generateKeyPair } from '@libp2p/crypto/keys' import { TypedEventEmitter, stop, start, KEEP_ALIVE } from '@libp2p/interface' -import { mockConnectionManager } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' -import { PeerSet } from '@libp2p/peer-collections' import { peerIdFromString, peerIdFromPrivateKey } from '@libp2p/peer-id' import { persistentPeerStore } from '@libp2p/peer-store' import { expect } from 'aegir/chai' import { MemoryDatastore } from 'datastore-core' -import all from 'it-all' +import delay from 'delay' import drain from 'it-drain' -import { pipe } from 'it-pipe' import random from 'lodash.random' import { pEvent } from 'p-event' -import pWaitFor from 'p-wait-for' -import sinon from 'sinon' -import { stubInterface } from 'sinon-ts' -import { Uint8ArrayList } from 'uint8arraylist' +import { stubInterface, type StubbedInstance } from 'sinon-ts' import { PROTOCOL } from '../src/constants.js' -import { Message, MessageType } from '../src/message/dht.js' -import { KAD_CLOSE_TAG_NAME, KAD_CLOSE_TAG_VALUE, KBUCKET_SIZE, RoutingTable, type RoutingTableComponents } from '../src/routing-table/index.js' +import { MessageType } from '../src/message/dht.js' +import { peerResponseEvent } from '../src/query/events.js' +import { KAD_CLOSE_TAG_NAME, KAD_PEER_TAG_NAME, KAD_PEER_TAG_VALUE, RoutingTable, type RoutingTableComponents } from '../src/routing-table/index.js' +import { isLeafBucket } from '../src/routing-table/k-bucket.js' import * as kadUtils from '../src/utils.js' import { createPeerId, createPeerIds } from './utils/create-peer-id.js' -import { sortClosestPeers } from './utils/sort-closest-peers.js' -import type { Libp2pEvents, PeerId, PeerStore, Stream, Peer } from '@libp2p/interface' -import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' +import type { Network } from '../src/network.js' +import type { Bucket } from '../src/routing-table/k-bucket.js' +import type { Libp2pEvents, PeerId, PeerStore, Peer } from '@libp2p/interface' describe('Routing Table', () => { let table: RoutingTable let components: RoutingTableComponents + let network: StubbedInstance beforeEach(async function () { this.timeout(20 * 1000) const events = new TypedEventEmitter() + network = stubInterface() components = { peerId: await createPeerId(), - connectionManager: stubInterface(), peerStore: stubInterface(), logger: defaultLogger() } - components.connectionManager = mockConnectionManager({ - ...components, - registrar: stubInterface(), - events - }) components.peerStore = persistentPeerStore({ ...components, datastore: new MemoryDatastore(), @@ -55,16 +47,147 @@ describe('Routing Table', () => { table = new RoutingTable(components, { logPrefix: '', - protocol: PROTOCOL + protocol: PROTOCOL, + network }) await start(table) + + // simulate connection succeeding + network.sendRequest.callsFake(async function * (from: PeerId) { + yield peerResponseEvent({ + from, + messageType: MessageType.PING + }) + }) }) afterEach(async () => { await stop(table) }) - it('add', async function () { + it('adds peers', async () => { + // make a very small routing table with a predictable structure + table = new RoutingTable({ + ...components, + // self peer kad id prefix is 00010 + peerId: peerIdFromString('12D3KooWNq99a7DtUgvzyiHwvBX4m7TDLmn6nLZvJUzSt72wc1Zu') + }, { + logPrefix: '', + protocol: PROTOCOL, + kBucketSize: 2, + prefixLength: 3, + network + }) + await start(table) + + const peerIds = [ + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi3'), // 00010 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi7'), // 00011 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiA'), // 00111 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiB'), // 01000 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiC'), // 11111 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiD'), // 11110 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiE'), // 10111 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZib') // 11001 + ] + + for (const peerId of peerIds) { + await table.add(peerId) + } + + const trie = collect(table.kb.root) + + expect(trie).to.deep.equal({ + prefix: '', + depth: 0, + left: { + prefix: '0', + depth: 0, + left: { + prefix: '0', + depth: 1, + left: { + prefix: '0', + depth: 2, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi3', // 00010 + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi7' // 00011 + ], + containsSelf: true + }, + right: { + prefix: '1', + depth: 2, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiA' // 00111 + ], + containsSelf: false + } + }, + right: { + prefix: '1', + depth: 1, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiB' // 01000 + ], + containsSelf: false + } + }, + right: { + prefix: '1', + depth: 0, + left: { + prefix: '0', + depth: 1, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiE' // 10111 + ], + containsSelf: false + }, + right: { + prefix: '1', + depth: 1, + left: { + prefix: '0', + depth: 2, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZib' // 11001 + ], + containsSelf: false + }, + right: { + prefix: '1', + depth: 2, + peers: [ + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiC', // 11111 + 'QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiD' // 11110 + ], + containsSelf: false + } + } + } + }) + + function collect (bucket: Bucket, obj: any = {}): any { + if (isLeafBucket(bucket)) { + return { + prefix: bucket.prefix, + depth: bucket.depth, + peers: bucket.peers.map(p => p.peerId.toString()), + containsSelf: Boolean(bucket.containsSelf) + } + } else { + obj.prefix = bucket.prefix + obj.depth = bucket.depth + obj.left = collect(bucket.left, {}) + obj.right = collect(bucket.right, {}) + } + + return obj + } + }) + + it('should add a lot of duplicated peers', async function () { this.timeout(20 * 1000) const ids = await createPeerIds(20) @@ -84,6 +207,74 @@ describe('Routing Table', () => { ) }) + it('should tag peers on add', async function () { + const peerCount = 100 + const ids = await createPeerIds(peerCount) + + for (const id of ids) { + await table.add(id) + } + + expect(table.size).to.equal(peerCount) + + // assert peers are tagged + const walked = await assertPeerTags(table.kb.root) + + expect(walked).to.equal(peerCount) + + async function assertPeerTags (bucket: Bucket): Promise { + let peers = 0 + + if (isLeafBucket(bucket)) { + for (const contact of bucket.peers) { + peers++ + + const peer = await components.peerStore.get(contact.peerId) + const tags = [...peer.tags.keys()] + + expect(tags).to.contain(KAD_PEER_TAG_NAME) + + if (bucket.containsSelf === true) { + expect(tags).to.contain(KAD_CLOSE_TAG_NAME) + expect(tags).to.contain(KEEP_ALIVE) + } else { + expect(tags).to.not.contain(KAD_CLOSE_TAG_NAME) + expect(tags).to.not.contain(KEEP_ALIVE) + } + } + } else { + if (bucket.left != null) { + peers += await assertPeerTags(bucket.left) + } + + if (bucket.right != null) { + peers += await assertPeerTags(bucket.right) + } + } + + return peers + } + }) + + it('should untag peers on remove', async function () { + const peerCount = 100 + const ids = await createPeerIds(peerCount) + + for (const id of ids) { + await table.add(id) + } + + const removePeer = ids[0] + await table.remove(removePeer) + + const peer = await components.peerStore.get(removePeer) + const tags = [...peer.tags.keys()] + + expect(tags).to.not.contain(KAD_PEER_TAG_NAME) + expect(tags).to.not.contain(KAD_CLOSE_TAG_NAME) + expect(tags).to.not.contain(KEEP_ALIVE) + }) + it('emits peer:add event', async () => { const id = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) const eventPromise = pEvent<'peer:add', CustomEvent>(table, 'peer:add') @@ -96,7 +287,9 @@ describe('Routing Table', () => { it('remove', async function () { const peers = await createPeerIds(10) - await Promise.all(peers.map(async (peer) => { await table.add(peer) })) + await Promise.all(peers.map(async (peer) => { + await table.add(peer) + })) const key = await kadUtils.convertPeerId(peers[2]) expect(table.closestPeers(key, 10)).to.have.length(10) @@ -148,45 +341,22 @@ describe('Routing Table', () => { const oldPeer = { kadId: await kadUtils.convertPeerId(peerIds[0]), - peerId: peerIds[0] + peerId: peerIds[0], + lastPing: 0 } const newPeer = { kadId: await kadUtils.convertPeerId(peerIds[1]), - peerId: peerIds[1] - } - - if (table.kb == null) { - throw new Error('kbucket not defined') + peerId: peerIds[1], + lastPing: Date.now() } // add the old peer - table.kb.add(oldPeer) - - const stream = stubInterface({ - source: (async function * () { - yield new Uint8ArrayList(Uint8Array.from([2]), Message.encode({ - type: MessageType.PING - })) - })(), - sink: async function (source) { - await drain(source) - } - }) - - // simulate connection succeeding - const newStreamStub = sinon.stub().withArgs(PROTOCOL).resolves(stream) - const openConnectionStub = sinon.stub().withArgs(oldPeer.peerId).resolves({ - newStream: newStreamStub - }) - components.connectionManager.openConnection = openConnectionStub - - await table._onPing(new CustomEvent('ping', { detail: { oldContacts: [oldPeer], newContact: newPeer } })) + await table.kb.add(oldPeer.peerId) - expect(openConnectionStub.calledOnce).to.be.true() - expect(openConnectionStub.calledWith(oldPeer.peerId)).to.be.true() + await drain(table.pingOldContacts([oldPeer])) - expect(newStreamStub.callCount).to.equal(1) - expect(newStreamStub.calledWith(PROTOCOL)).to.be.true() + expect(network.sendRequest.calledTwice).to.be.true() + expect(network.sendRequest.calledWith(oldPeer.peerId)).to.be.true() // did not add the new peer expect(table.kb.get(newPeer.kadId)).to.be.undefined() @@ -196,36 +366,62 @@ describe('Routing Table', () => { }) it('evicts oldest peer that does not respond to ping', async () => { + // make a very small routing table with a predictable structure + table = new RoutingTable({ + ...components, + peerId: peerIdFromString('12D3KooWNq99a7DtUgvzyiHwvBX4m7TDLmn6nLZvJUzSt72wc1Zu') + }, { + logPrefix: '', + protocol: PROTOCOL, + kBucketSize: 1, + prefixLength: 1, + network, + lastPingThreshold: 1 + }) + await start(table) + const peerIds = [ - peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi5'), - peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi6') + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi1'), + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi2') ] + for (const peerId of peerIds) { + await table.add(peerId) + } + + const newPeerId = peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi5') + const oldPeer = { kadId: await kadUtils.convertPeerId(peerIds[0]), peerId: peerIds[0] } const newPeer = { - kadId: await kadUtils.convertPeerId(peerIds[1]), - peerId: peerIds[1] + kadId: await kadUtils.convertPeerId(newPeerId), + peerId: newPeerId } + // ensure the lastPing threshold is passed + await delay(100) + + // reset network stub so we can have specific behaviour + table.network = network = stubInterface() + // libp2p fails to dial the old peer - const openConnectionStub = sinon.stub().withArgs(oldPeer.peerId).rejects(new Error('Could not dial peer')) - components.connectionManager.openConnection = openConnectionStub + network.sendRequest.withArgs(oldPeer.peerId).rejects(new Error('Could not dial peer')) - if (table.kb == null) { - throw new Error('kbucket not defined') - } + // the new peer answers the ping + network.sendRequest.withArgs(newPeer.peerId).callsFake(async function * (from: PeerId) { + yield peerResponseEvent({ + from, + messageType: MessageType.PING + }) + }) // add the old peer - table.kb.add(oldPeer) - - await table._onPing(new CustomEvent('ping', { detail: { oldContacts: [oldPeer], newContact: newPeer } })) - await table.pingQueue.onIdle() + await table.kb.add(oldPeer.peerId) - expect(openConnectionStub.callCount).to.equal(1) - expect(openConnectionStub.calledWith(oldPeer.peerId)).to.be.true() + // add the new peer + await table.kb.add(newPeer.peerId) // added the new peer expect(table.kb.get(newPeer.kadId)).to.not.be.undefined() @@ -235,117 +431,72 @@ describe('Routing Table', () => { }) it('tags newly found kad-close peers', async () => { - const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - const tagPeerSpy = sinon.spy(components.peerStore, 'merge') - - await table.add(remotePeer) - - expect(tagPeerSpy.callCount).to.equal(0, 'did not debounce call to peerStore.tagPeer') - - await pWaitFor(() => { - return tagPeerSpy.callCount === 1 - }) - - expect(tagPeerSpy.callCount).to.equal(1, 'did not tag kad-close peer') - expect(tagPeerSpy.getCall(0).args[0].toString()).to.equal(remotePeer.toString()) - expect(tagPeerSpy.getCall(0).args[1].tags).to.deep.equal({ - [KAD_CLOSE_TAG_NAME]: { - value: KAD_CLOSE_TAG_VALUE - }, - [KEEP_ALIVE]: { - value: 1 - } + // make a very small routing table with a predictable structure + table = new RoutingTable({ + ...components, + // self peer kad id prefix is 00010 + peerId: peerIdFromString('12D3KooWNq99a7DtUgvzyiHwvBX4m7TDLmn6nLZvJUzSt72wc1Zu') + }, { + logPrefix: '', + protocol: PROTOCOL, + kBucketSize: 2, + prefixLength: 2, + network }) - }) - - it('removes tags from kad-close peers when closer peers are found', async () => { - async function getTaggedPeers (): Promise { - return new PeerSet(await pipe( - await components.peerStore.all(), - async function * (source) { - for await (const peer of source) { - const peerData = await components.peerStore.get(peer.id) - - if (peerData.tags.has(KAD_CLOSE_TAG_NAME)) { - yield peer.id - } - } - }, - async (source) => all(source) - )) - } - - const tagPeerSpy = sinon.spy(components.peerStore, 'merge') - const localNodeId = await kadUtils.convertPeerId(components.peerId) - const sortedPeerList = await sortClosestPeers( - await createPeerIds(KBUCKET_SIZE + 1), - localNodeId - ) + await start(table) - // sort list furthest -> closest - sortedPeerList.reverse() + const peerIds = [ + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZi3'), // 00010 + peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiB') // 01000 + ] - // fill the table up to the first kbucket size - for (let i = 0; i < KBUCKET_SIZE; i++) { - await table.add(sortedPeerList[i]) + for (const peerId of peerIds) { + await table.add(peerId) } - // should have all added contacts in the root kbucket - expect(table.kb?.count()).to.equal(KBUCKET_SIZE, 'did not fill kbuckets') - expect(table.kb?.root).to.have.property('peers').with.lengthOf(KBUCKET_SIZE, 'split root kbucket when we should not have') - expect(table.kb?.root).to.not.have.property('left', 'split root kbucket when we should not have') - expect(table.kb?.root).to.not.have.property('right', 'split root kbucket when we should not have') - - await pWaitFor(() => { - return tagPeerSpy.callCount === KBUCKET_SIZE - }) + // current close peer should be marked close + const closePeerData = await components.peerStore.get(peerIds[1]) + expect(closePeerData.tags.has(KAD_CLOSE_TAG_NAME)).to.be.true() + expect(closePeerData.tags.has(KEEP_ALIVE)).to.be.true() + expect(closePeerData.tags.has(KAD_PEER_TAG_NAME)).to.be.true() - // make sure we tagged all of the peers as kad-close - const taggedPeers = await getTaggedPeers() - expect(taggedPeers.difference(new PeerSet(sortedPeerList.slice(0, sortedPeerList.length - 1)))).to.have.property('size', 0) - tagPeerSpy.resetHistory() + const newPeer = peerIdFromString('QmYobx1VAHP7Mi88LcDvLeQoWcc1Aa2rynYHpdEPBqHZiA') // 00111 - // add a node that is closer than any added so far - await table.add(sortedPeerList[sortedPeerList.length - 1]) + await table.add(newPeer) - expect(table.kb?.count()).to.equal(KBUCKET_SIZE + 1, 'did not fill kbuckets') - expect(table.kb?.root).to.have.property('left').that.is.not.null('did not split root kbucket when we should have') - expect(table.kb?.root).to.have.property('right').that.is.not.null('did not split root kbucket when we should have') + // new peer should be marked close + const newPeerData = await components.peerStore.get(newPeer) + expect(newPeerData.tags.has(KAD_CLOSE_TAG_NAME)).to.be.true() + expect(newPeerData.tags.has(KEEP_ALIVE)).to.be.true() + expect(newPeerData.tags.has(KAD_PEER_TAG_NAME)).to.be.true() - // wait for tag new peer and untag old peer - await pWaitFor(() => { - return tagPeerSpy.callCount === 2 - }) - - // should have updated list of tagged peers - const finalTaggedPeers = await getTaggedPeers() - expect(finalTaggedPeers.difference(new PeerSet(sortedPeerList.slice(1)))).to.have.property('size', 0) + // not close but not evicted from the table because it wasn't full yet + const movedPeerData = await components.peerStore.get(peerIds[1]) + expect(movedPeerData.tags.has(KAD_CLOSE_TAG_NAME)).to.be.false() + expect(movedPeerData.tags.has(KEEP_ALIVE)).to.be.false() + expect(movedPeerData.tags.has(KAD_PEER_TAG_NAME)).to.be.true() }) it('adds peerstore peers to the routing table on startup', async () => { - const peer1 = stubInterface({ + const peer = stubInterface({ id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), protocols: [ PROTOCOL - ] - }) - const peer2 = stubInterface({ - id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')), - protocols: [ - '/ipfs/id/1.0.0' - ] + ], + tags: new Map([[KAD_PEER_TAG_NAME, { value: KAD_PEER_TAG_VALUE }]]) }) - await expect(table.find(peer1.id)).to.eventually.be.undefined() - await expect(table.find(peer2.id)).to.eventually.be.undefined() + await expect(table.find(peer.id)).to.eventually.be.undefined() await stop(table) - components.peerStore.all = async () => [peer1, peer2] + components.peerStore.all = async () => [peer] await start(table) - await expect(table.find(peer1.id)).to.eventually.be.ok() - await expect(table.find(peer2.id)).to.eventually.be.undefined() + // this is done asynchronously + await pEvent(table, 'peer:add') + + await expect(table.find(peer.id)).to.eventually.be.ok() }) }) diff --git a/packages/kad-dht/test/utils/test-dht.ts b/packages/kad-dht/test/utils/test-dht.ts index dff13bd89f..349cfdb8ea 100644 --- a/packages/kad-dht/test/utils/test-dht.ts +++ b/packages/kad-dht/test/utils/test-dht.ts @@ -86,6 +86,9 @@ export class TestDHT { const dht = new KadDHTClass(components, opts) + // skip peer validation + dht.routingTable.kb.verify = async () => true + // simulate libp2p._onDiscoveryPeer dht.addEventListener('peer', (evt) => { const peerData = evt.detail