Skip to content

Commit

Permalink
feat!: ping peers before adding to routing table (#2745)
Browse files Browse the repository at this point in the history
Implements the [check-before-add](https://github.com/libp2p/go-libp2p-kad-dht/blob/master/optimizations.md#checking-before-adding)
client optimisation to ping a peer before adding it to the routing
table.

Adds a "new peer ping queue" to apply a concurrency limit to these
pings, because it would be expected for old contacts to be less
likely to be online so don't block adding new contacts to unrelated
buckets if the connection to an old contact is timing out while
being pinged before eviction.

BREAKING CHANGE: the routing ping options have been split into "old contact" and "new contact" and renamed according
  • Loading branch information
achingbrain authored Oct 3, 2024
1 parent 80e798c commit 661d658
Show file tree
Hide file tree
Showing 14 changed files with 927 additions and 472 deletions.
1 change: 0 additions & 1 deletion packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"protons": "^7.5.0",
"sinon": "^18.0.0",
"sinon-ts": "^2.0.0",
Expand Down
32 changes: 30 additions & 2 deletions packages/kad-dht/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,43 @@ export interface KadDHTInit {
* Settings for how long to wait in ms when pinging DHT peers to decide if
* they should be evicted from the routing table or not.
*/
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>
pingOldContactTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>

/**
* How many peers to ping in parallel when deciding if they should
* be evicted from the routing table or not
*
* @default 10
*/
pingConcurrency?: number
pingOldContactConcurrency?: number

/**
* How long the queue to ping peers is allowed to grow
*
* @default 100
*/
pingOldContactMaxQueueSize?: number

/**
* Settings for how long to wait in ms when pinging DHT peers to decide if
* they should be added to the routing table or not.
*/
pingNewContactTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>

/**
* How many peers to ping in parallel when deciding if they should be added to
* the routing table or not
*
* @default 10
*/
pingNewContactConcurrency?: number

/**
* How long the queue to ping peers is allowed to grow
*
* @default 100
*/
pingNewContactMaxQueueSize?: number

/**
* How many parallel incoming streams to allow on the DHT protocol per
Expand Down
44 changes: 29 additions & 15 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
querySelfInterval,
protocol,
logPrefix,
pingTimeout,
pingConcurrency,
maxInboundStreams,
maxOutboundStreams,
providers: providersInit
Expand All @@ -156,15 +154,6 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
this.maxInboundStreams = maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
this.maxOutboundStreams = maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
this.peerInfoMapper = init.peerInfoMapper ?? removePrivateAddressesMapper
this.routingTable = new RoutingTable(components, {
kBucketSize,
pingTimeout,
pingConcurrency,
protocol: this.protocol,
logPrefix: loggingPrefix,
prefixLength: init.prefixLength,
splitThreshold: init.kBucketSplitThreshold
})

this.providers = new Providers(components, providersInit ?? {})

Expand All @@ -181,6 +170,21 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
logPrefix: loggingPrefix
})

this.routingTable = new RoutingTable(components, {
kBucketSize,
pingOldContactTimeout: init.pingOldContactTimeout,
pingOldContactConcurrency: init.pingOldContactConcurrency,
pingOldContactMaxQueueSize: init.pingOldContactMaxQueueSize,
pingNewContactTimeout: init.pingNewContactTimeout,
pingNewContactConcurrency: init.pingNewContactConcurrency,
pingNewContactMaxQueueSize: init.pingNewContactMaxQueueSize,
protocol: this.protocol,
logPrefix: loggingPrefix,
prefixLength: init.prefixLength,
splitThreshold: init.kBucketSplitThreshold,
network: this.network
})

// all queries should wait for the initial query-self query to run so we have
// some peers and don't force consumers to use arbitrary timeouts
const initialQuerySelfHasRun = pDefer<any>()
Expand Down Expand Up @@ -376,11 +380,17 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka

await this.components.registrar.unhandle(this.protocol)

// check again after async work
if (mode === this.getMode() && !force) {
this.log('already in %s mode', mode)
return
}

if (mode === 'client') {
this.log('enabling client mode')
this.log('enabling client mode while in %s mode', this.getMode())
this.clientMode = true
} else {
this.log('enabling server mode')
this.log('enabling server mode while in %s mode', this.getMode())
this.clientMode = false
await this.components.registrar.handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc), {
maxInboundStreams: this.maxInboundStreams,
Expand All @@ -399,14 +409,18 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
await this.setMode(this.clientMode ? 'client' : 'server', true)

await start(
this.querySelf,
this.routingTable,
this.providers,
this.queryManager,
this.network,
this.routingTable,
this.topologyListener,
this.routingTableRefresh
)

// Query self after other components are configured
await start(
this.querySelf
)
}

/**
Expand Down
5 changes: 1 addition & 4 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
}

/**
* Send a request and record RTT for latency measurements
* Send a request and read a response
*/
async * sendRequest (to: PeerId, msg: Partial<Message>, options: RoutingOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
Expand Down Expand Up @@ -204,7 +204,6 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
async _writeMessage (stream: Stream, msg: Partial<Message>, options: AbortOptions): Promise<void> {
const pb = pbStream(stream)
await pb.write(msg, Message, options)
await pb.unwrap().close(options)
}

/**
Expand All @@ -219,8 +218,6 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab

const message = await pb.read(Message, options)

await pb.unwrap().close(options)

// tell any listeners about new peers we've seen
message.closer.forEach(peerData => {
this.safeDispatchEvent<PeerInfo>('peer', {
Expand Down
20 changes: 14 additions & 6 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,27 @@ export class QuerySelf implements Startable {

if (this.started) {
this.controller = new AbortController()
const timeoutSignal = AbortSignal.timeout(this.queryTimeout)
const signal = anySignal([this.controller.signal, timeoutSignal])
const signals = [this.controller.signal]

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
setMaxListeners(Infinity, signal, this.controller.signal, timeoutSignal)
// add a shorter timeout if we've already run our initial self query
if (this.initialQuerySelfHasRun == null) {
const timeoutSignal = AbortSignal.timeout(this.queryTimeout)
setMaxListeners(Infinity, timeoutSignal)
signals.push(timeoutSignal)
}

const signal = anySignal(signals)
setMaxListeners(Infinity, signal, this.controller.signal)

try {
if (this.routingTable.size === 0) {
this.log('routing table was empty, waiting for some peers before running query')
// wait to discover at least one DHT peer
// wait to discover at least one DHT peer that isn't us
await pEvent(this.routingTable, 'peer:add', {
signal
signal,
filter: (event) => !this.peerId.equals(event.detail)
})
this.log('routing table has peers, continuing with query')
}

this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)
Expand Down
Loading

0 comments on commit 661d658

Please sign in to comment.