Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

feat: limit the number of cold calls we can do #316

Merged
merged 16 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ const sw = new switch(peerInfo , peerBook [, options])

If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Each successive blacklisting will increase the ttl from the base value. Defaults to 5 minutes
- `blackListAttempts`: - number of blacklists before a peer
is permanently blacklisted. Defaults to 5.
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `100`
- `maxColdCalls`: - number of queued cold calls that are allowed. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
Expand Down
7 changes: 5 additions & 2 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
BLACK_LIST_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
BLACK_LIST_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently blacklisted
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
ONE_HOUR: 1 * 60 * 60e3
}
9 changes: 8 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

const DialQueueManager = require('./queueManager')
const getPeerInfo = require('../get-peer-info')
const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')
const {
BLACK_LIST_ATTEMPTS,
BLACK_LIST_TTL,
MAX_COLD_CALLS,
MAX_PARALLEL_DIALS
} = require('../constants')

module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)
Expand Down Expand Up @@ -74,7 +79,9 @@ module.exports = function (_switch) {
dialFSM,
abort,
clearBlacklist,
BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls,
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
}
}
24 changes: 21 additions & 3 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Queue {
this.switch = _switch
this._queue = []
this.blackListed = null
this.blackListCount = 0
this.isRunning = false
this.onStopped = onStopped
}
Expand Down Expand Up @@ -97,7 +98,7 @@ class Queue {
isDialAllowed () {
if (this.blackListed) {
// If the blacklist ttl has passed, reset it
if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) {
if (Date.now() > this.blackListed) {
this.blackListed = null
return true
}
Expand Down Expand Up @@ -146,10 +147,24 @@ class Queue {

/**
* Marks the queue as blacklisted. The queue will be immediately aborted.
* @returns {void}
*/
blacklist () {
log('blacklisting queue for %s', this.id)
this.blackListed = Date.now()
this.blackListCount++

if (this.blackListCount >= this.switch.dialer.BLACK_LIST_ATTEMPTS) {
this.blackListed = Infinity
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return
}

let ttl = this.switch.dialer.BLACK_LIST_TTL * Math.pow(this.blackListCount, 3)
const minTTL = ttl * 0.9
const maxTTL = ttl * 1.1

// Add a random jitter of 20% to the ttl
ttl = Math.floor(Math.random() * (maxTTL - minTTL) + minTTL)

this.blackListed = Date.now() + ttl
this.abort()
}

Expand Down Expand Up @@ -236,13 +251,16 @@ class Queue {

// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.blackListCount = 0 // reset blacklisting on good connections
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})

connectionFSM.once('unmuxed', () => {
this.blackListCount = 0
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
Expand Down
76 changes: 66 additions & 10 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const retimer = require('retimer')
const { ONE_HOUR } = require('../constants')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,9 +15,28 @@ class DialQueueManager {
*/
constructor (_switch) {
this._queue = new Set()
this._coldCallQueue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
this._cleanInterval = retimer(this._clean.bind(this), ONE_HOUR)
}

/**
* Runs through all queues, aborts and removes them if they
* are no longer valid. A queue that is blacklisted indefinitely,
* is considered no longer valid.
* @private
*/
_clean () {
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
if (dialQueue.blackListed === Infinity) {
dialQueue.abort()
delete this._queues[dialQueue.id]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about doing this._queues[dialQueue.id] = undefined?

This is more efficient than deleting, but than we will need to verify that keys have a value when iterating the array, so I do not have a strong opinion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only running on a 15min interval I think the performance hit should be fairly insignificant and would avoid us needing to check values.

}
})
this._cleanInterval.reschedule(ONE_HOUR)
}

/**
Expand All @@ -26,15 +49,18 @@ class DialQueueManager {
// Clear the general queue
this._queue.clear()

this._cleanInterval.clear()

// Abort the individual peer queues
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
dialQueue.abort()
delete this._queues[dialQueue.id]
})
}

/**
* Adds the `dialRequest` to the queue and ensures the queue is running
* Adds the `dialRequest` to the queue and ensures queue is running
*
* @param {DialRequest} dialRequest
* @returns {void}
Expand All @@ -44,6 +70,11 @@ class DialQueueManager {

// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
return nextTick(callback, DIAL_ABORTED())
}

targetQueue.add(protocol, useFSM, callback)

// If we're already connected to the peer, start the queue now
Expand All @@ -54,10 +85,23 @@ class DialQueueManager {
return
}

// Add the id to the general queue set if the queue isn't running
// and if the queue is allowed to dial
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
this._queue.add(targetQueue.id)
// If dialing is not allowed, abort
if (!targetQueue.isDialAllowed()) {
return
}

// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if (protocol) {
this._queue.add(targetQueue.id)
this._coldCallQueue.delete(targetQueue.id)
// Only add it to the cold queue if it's not in the normal queue
} else if (!this._queue.has(targetQueue.id)) {
this._coldCallQueue.add(targetQueue.id)
// The peer is already in the normal queue, abort the cold call
} else {
return nextTick(callback, DIAL_ABORTED())
}
}

this.run()
Expand All @@ -67,11 +111,21 @@ class DialQueueManager {
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
let nextQueue = this._queue.values().next()
if (nextQueue.done) return
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
let nextQueue = { done: true }
// Check the queue first and fall back to the cold call queue
if (this._queue.size > 0) {
nextQueue = this._queue.values().next()
this._queue.delete(nextQueue.value)
} else if (this._coldCallQueue.size > 0) {
nextQueue = this._coldCallQueue.values().next()
this._coldCallQueue.delete(nextQueue.value)
}

if (nextQueue.done) {
return
}

this._queue.delete(nextQueue.value)
let targetQueue = this._queues[nextQueue.value]
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
Expand All @@ -83,7 +137,9 @@ class DialQueueManager {
* @param {PeerInfo} peerInfo
*/
clearBlacklist (peerInfo) {
this.getQueue(peerInfo).blackListed = null
const queue = this.getQueue(peerInfo)
queue.blackListed = null
queue.blackListCount = 0
}

/**
Expand Down
Loading