Skip to content

Commit

Permalink
fix: tag kad-close peers with keepalive (#2740)
Browse files Browse the repository at this point in the history
This will ensure we are always connected to peers in our root kad bucket.

Also passes the complete set of options to the routing table to allow constraining memory usage from the constructor.
  • Loading branch information
achingbrain authored Sep 30, 2024
1 parent 4a14d4a commit 12bcd86
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
8 changes: 3 additions & 5 deletions packages/kad-dht/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,10 @@ export interface KadDHTInit {
logPrefix?: string

/**
* How long to wait in ms when pinging DHT peers to decide if they
* should be evicted from the routing table or not.
*
* @default 10000
* Settings for how long to wait in ms when pinging DHT peers to decide if
* they should be evicted from the routing table or not.
*/
pingTimeout?: number
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>

/**
* How many peers to ping in parallel when deciding if they should
Expand Down
4 changes: 3 additions & 1 deletion packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
pingTimeout,
pingConcurrency,
protocol: this.protocol,
logPrefix: loggingPrefix
logPrefix: loggingPrefix,
prefixLength: init.prefixLength,
splitThreshold: init.kBucketSplitThreshold
})

this.providers = new Providers(components, providersInit ?? {})
Expand Down
28 changes: 20 additions & 8 deletions packages/kad-dht/src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import { InvalidMessageError, TypedEventEmitter } from '@libp2p/interface'
import { InvalidMessageError, KEEP_ALIVE, TypedEventEmitter } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { pbStream } from 'it-protobuf-stream'
import { Message, MessageType } from '../message/dht.js'
import * as utils from '../utils.js'
import { KBucket, isLeafBucket, type Bucket, type PingEventDetails } from './k-bucket.js'
import type { ComponentLogger, CounterGroup, Logger, Metric, Metrics, PeerId, PeerStore, Startable, Stream } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'

export const KAD_CLOSE_TAG_NAME = 'kad-close'
export const KAD_CLOSE_TAG_VALUE = 50
export const KBUCKET_SIZE = 20
export const PREFIX_LENGTH = 32
export const PING_TIMEOUT = 10000
export const PING_CONCURRENCY = 10
export const PING_TIMEOUT = 2000
export const PING_CONCURRENCY = 20

export interface RoutingTableInit {
logPrefix: string
protocol: string
prefixLength?: number
splitThreshold?: number
kBucketSize?: number
pingTimeout?: number
pingTimeout?: AdaptiveTimeoutInit
pingConcurrency?: number
tagName?: string
tagValue?: number
Expand Down Expand Up @@ -53,7 +55,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
private readonly components: RoutingTableComponents
private readonly prefixLength: number
private readonly splitThreshold: number
private readonly pingTimeout: number
private readonly pingTimeout: AdaptiveTimeout
private readonly pingConcurrency: number
private running: boolean
private readonly protocol: string
Expand All @@ -73,7 +75,6 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.components = components
this.log = components.logger.forComponent(`${init.logPrefix}:routing-table`)
this.kBucketSize = init.kBucketSize ?? KBUCKET_SIZE
this.pingTimeout = init.pingTimeout ?? PING_TIMEOUT
this.pingConcurrency = init.pingConcurrency ?? PING_CONCURRENCY
this.running = false
this.protocol = init.protocol
Expand All @@ -90,6 +91,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
this.pingQueue.addEventListener('error', evt => {
this.log.error('error pinging peer', evt.detail)
})
this.pingTimeout = new AdaptiveTimeout({
...(init.pingTimeout ?? {}),
metrics: this.components.metrics,
metricName: `${init.logPrefix.replaceAll(':', '_')}_routing_table_ping_time_milliseconds`
})

if (this.components.metrics != null) {
this.metrics = {
Expand Down Expand Up @@ -177,6 +183,9 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
tags: {
[this.tagName]: {
value: this.tagValue
},
[KEEP_ALIVE]: {
value: 1
}
}
})
Expand All @@ -185,7 +194,8 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen
for (const peer of removedPeers) {
await this.components.peerStore.merge(peer, {
tags: {
[this.tagName]: undefined
[this.tagName]: undefined,
[KEEP_ALIVE]: undefined
}
})
}
Expand Down Expand Up @@ -242,10 +252,11 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen

return this.pingQueue.add(async () => {
let stream: Stream | undefined
const signal = this.pingTimeout.getTimeoutSignal()

try {
const options = {
signal: AbortSignal.timeout(this.pingTimeout)
signal
}

this.log('pinging old contact %p', oldContact.peerId)
Expand Down Expand Up @@ -278,6 +289,7 @@ export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implemen

return false
} finally {
this.pingTimeout.cleanUp(signal)
this.updateMetrics()
}
}, {
Expand Down
5 changes: 4 additions & 1 deletion packages/kad-dht/test/routing-table.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-env mocha */

import { generateKeyPair } from '@libp2p/crypto/keys'
import { TypedEventEmitter, stop, start } from '@libp2p/interface'
import { TypedEventEmitter, stop, start, KEEP_ALIVE } from '@libp2p/interface'
import { mockConnectionManager } from '@libp2p/interface-compliance-tests/mocks'
import { defaultLogger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
Expand Down Expand Up @@ -251,6 +251,9 @@ describe('Routing Table', () => {
expect(tagPeerSpy.getCall(0).args[1].tags).to.deep.equal({
[KAD_CLOSE_TAG_NAME]: {
value: KAD_CLOSE_TAG_VALUE
},
[KEEP_ALIVE]: {
value: 1
}
})
})
Expand Down

0 comments on commit 12bcd86

Please sign in to comment.