Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
feat: limit the number of cold calls we can do (#316)
Browse files Browse the repository at this point in the history
* feat: limit the number of cold calls we can do

* feat: add a backoff to blacklisting

* refactor: make cold calls configurable

* fix: make blacklist duration longer

* fix: improve blacklisting

* test: add some tests for queue

* feat: add jitter to blacklist ttl

* test: validate cold queue is removed

* feat: purge old queues every hour

* test: fix aegir post script node shutdown

* fix: abort the cold call queue on manager abort

* fix: improve queue cleanup and lower interval to 15 mins
  • Loading branch information
jacobheun authored Apr 2, 2019
1 parent d550b48 commit 4a543cb
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function pre (done) {

function post (done) {
parallel([
(cb) => switchA.transport.close('ws', cb),
(cb) => switchA.stop(cb),
(cb) => switchB.stop(cb),
(cb) => sigS.stop(cb)
], done)
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ const sw = new switch(peerInfo , peerBook [, options])

If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Each successive blacklisting will increase the ttl from the base value. Defaults to 5 minutes
- `blackListAttempts`: - number of blacklists before a peer
is permanently blacklisted. Defaults to 5.
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `100`
- `maxColdCalls`: - number of queued cold calls that are allowed. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
Expand Down
7 changes: 5 additions & 2 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
BLACK_LIST_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
BLACK_LIST_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently blacklisted
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
QUARTER_HOUR: 15 * 60e3
}
9 changes: 8 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

const DialQueueManager = require('./queueManager')
const getPeerInfo = require('../get-peer-info')
const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')
const {
BLACK_LIST_ATTEMPTS,
BLACK_LIST_TTL,
MAX_COLD_CALLS,
MAX_PARALLEL_DIALS
} = require('../constants')

module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)
Expand Down Expand Up @@ -74,7 +79,9 @@ module.exports = function (_switch) {
dialFSM,
abort,
clearBlacklist,
BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls,
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
}
}
24 changes: 21 additions & 3 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Queue {
this.switch = _switch
this._queue = []
this.blackListed = null
this.blackListCount = 0
this.isRunning = false
this.onStopped = onStopped
}
Expand Down Expand Up @@ -97,7 +98,7 @@ class Queue {
isDialAllowed () {
if (this.blackListed) {
// If the blacklist ttl has passed, reset it
if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) {
if (Date.now() > this.blackListed) {
this.blackListed = null
return true
}
Expand Down Expand Up @@ -146,10 +147,24 @@ class Queue {

/**
* Marks the queue as blacklisted. The queue will be immediately aborted.
* @returns {void}
*/
blacklist () {
log('blacklisting queue for %s', this.id)
this.blackListed = Date.now()
this.blackListCount++

if (this.blackListCount >= this.switch.dialer.BLACK_LIST_ATTEMPTS) {
this.blackListed = Infinity
return
}

let ttl = this.switch.dialer.BLACK_LIST_TTL * Math.pow(this.blackListCount, 3)
const minTTL = ttl * 0.9
const maxTTL = ttl * 1.1

// Add a random jitter of 20% to the ttl
ttl = Math.floor(Math.random() * (maxTTL - minTTL) + minTTL)

this.blackListed = Date.now() + ttl
this.abort()
}

Expand Down Expand Up @@ -236,13 +251,16 @@ class Queue {

// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.blackListCount = 0 // reset blacklisting on good connections
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})

connectionFSM.once('unmuxed', () => {
this.blackListCount = 0
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
Expand Down
103 changes: 93 additions & 10 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const retimer = require('retimer')
const { QUARTER_HOUR } = require('../constants')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,9 +15,53 @@ class DialQueueManager {
*/
constructor (_switch) {
this._queue = new Set()
this._coldCallQueue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR)
}

/**
* Runs through all queues, aborts and removes them if they
* are no longer valid. A queue that is blacklisted indefinitely,
* is considered no longer valid.
* @private
*/
_clean () {
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
// Clear if the queue has reached max blacklist
if (dialQueue.blackListed === Infinity) {
dialQueue.abort()
delete this._queues[dialQueue.id]
return
}

// Keep track of blacklisted queues
if (dialQueue.blackListed) return

// Clear if peer is no longer active
// To avoid reallocating memory, dont delete queues of
// connected peers, as these are highly likely to leverage the
// queues in the immediate term
if (!dialQueue.isRunning && dialQueue.length < 1) {
let isConnected = false
try {
const peerInfo = this.switch._peerBook.get(dialQueue.id)
isConnected = Boolean(peerInfo.isConnected())
} catch (_) {
// If we get an error, that means the peerbook doesnt have the peer
}

if (!isConnected) {
dialQueue.abort()
delete this._queues[dialQueue.id]
}
}
})

this._cleanInterval.reschedule(QUARTER_HOUR)
}

/**
Expand All @@ -25,16 +73,21 @@ class DialQueueManager {
abort () {
// Clear the general queue
this._queue.clear()
// Clear the cold call queue
this._coldCallQueue.clear()

this._cleanInterval.clear()

// Abort the individual peer queues
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
dialQueue.abort()
delete this._queues[dialQueue.id]
})
}

/**
* Adds the `dialRequest` to the queue and ensures the queue is running
* Adds the `dialRequest` to the queue and ensures queue is running
*
* @param {DialRequest} dialRequest
* @returns {void}
Expand All @@ -44,6 +97,11 @@ class DialQueueManager {

// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
return nextTick(callback, DIAL_ABORTED())
}

targetQueue.add(protocol, useFSM, callback)

// If we're already connected to the peer, start the queue now
Expand All @@ -54,10 +112,23 @@ class DialQueueManager {
return
}

// Add the id to the general queue set if the queue isn't running
// and if the queue is allowed to dial
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
this._queue.add(targetQueue.id)
// If dialing is not allowed, abort
if (!targetQueue.isDialAllowed()) {
return
}

// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
if (protocol) {
this._queue.add(targetQueue.id)
this._coldCallQueue.delete(targetQueue.id)
// Only add it to the cold queue if it's not in the normal queue
} else if (!this._queue.has(targetQueue.id)) {
this._coldCallQueue.add(targetQueue.id)
// The peer is already in the normal queue, abort the cold call
} else {
return nextTick(callback, DIAL_ABORTED())
}
}

this.run()
Expand All @@ -67,11 +138,21 @@ class DialQueueManager {
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
let nextQueue = this._queue.values().next()
if (nextQueue.done) return
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
let nextQueue = { done: true }
// Check the queue first and fall back to the cold call queue
if (this._queue.size > 0) {
nextQueue = this._queue.values().next()
this._queue.delete(nextQueue.value)
} else if (this._coldCallQueue.size > 0) {
nextQueue = this._coldCallQueue.values().next()
this._coldCallQueue.delete(nextQueue.value)
}

if (nextQueue.done) {
return
}

this._queue.delete(nextQueue.value)
let targetQueue = this._queues[nextQueue.value]
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
Expand All @@ -83,7 +164,9 @@ class DialQueueManager {
* @param {PeerInfo} peerInfo
*/
clearBlacklist (peerInfo) {
this.getQueue(peerInfo).blackListed = null
const queue = this.getQueue(peerInfo)
queue.blackListed = null
queue.blackListCount = 0
}

/**
Expand Down
Loading

0 comments on commit 4a543cb

Please sign in to comment.