Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

feat: limit the number of cold calls we can do #316

Merged
merged 16 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ const sw = new switch(peerInfo , peerBook [, options])

If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to 5 minutes
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `100`
- `maxColdCalls`: - number of queued cold calls that are allowed. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
Expand Down
5 changes: 3 additions & 2 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
'use strict'

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
BLACK_LIST_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100 // Maximum allowed concurrent dials
}
3 changes: 2 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const DialQueueManager = require('./queueManager')
const getPeerInfo = require('../get-peer-info')
const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')
const { MAX_COLD_CALLS, MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants')

module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)
Expand Down Expand Up @@ -75,6 +75,7 @@ module.exports = function (_switch) {
abort,
clearBlacklist,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls,
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
}
}
16 changes: 13 additions & 3 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Queue {
this.switch = _switch
this._queue = []
this.blackListed = null
this.blackListCount = 0
this.isRunning = false
this.onStopped = onStopped
}
Expand Down Expand Up @@ -97,7 +98,7 @@ class Queue {
isDialAllowed () {
if (this.blackListed) {
// If the blacklist ttl has passed, reset it
if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) {
if (Date.now() > this.blackListed) {
this.blackListed = null
return true
}
Expand Down Expand Up @@ -146,10 +147,17 @@ class Queue {

/**
* Marks the queue as blacklisted. The queue will be immediately aborted.
* @returns {void}
*/
blacklist () {
log('blacklisting queue for %s', this.id)
this.blackListed = Date.now()
this.blackListCount++

if (this.blackListCount >= 5) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
this.blackListed = Infinity
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return
}

this.blackListed = Date.now() + (this.switch.dialer.BLACK_LIST_TTL * Math.pow(this.blackListCount, 3))
this.abort()
}

Expand Down Expand Up @@ -236,13 +244,15 @@ class Queue {

// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.blackListCount = 0 // reset blacklisting on good connections
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})

connectionFSM.once('unmuxed', () => {
this.blackListCount = 0
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
Expand Down
48 changes: 38 additions & 10 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,6 +13,7 @@ class DialQueueManager {
*/
constructor (_switch) {
this._queue = new Set()
this._coldCallQueue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
Expand All @@ -34,7 +37,7 @@ class DialQueueManager {
}

/**
* Adds the `dialRequest` to the queue and ensures the queue is running
* Adds the `dialRequest` to the qMAX_COLD_CALLSe queue is running
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
*
* @param {DialRequest} dialRequest
* @returns {void}
Expand All @@ -44,6 +47,11 @@ class DialQueueManager {

// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) {
return nextTick(callback, DIAL_ABORTED())
}

targetQueue.add(protocol, useFSM, callback)

// If we're already connected to the peer, start the queue now
Expand All @@ -54,10 +62,18 @@ class DialQueueManager {
return
}

// Add the id to the general queue set if the queue isn't running
// and if the queue is allowed to dial
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
this._queue.add(targetQueue.id)
// If dialing is not allowed, abort
if (!targetQueue.isDialAllowed()) {
return
}

// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if (protocol) {
this._queue.add(targetQueue.id)
} else {
this._coldCallQueue.add(targetQueue.id)
}
}

this.run()
Expand All @@ -67,11 +83,21 @@ class DialQueueManager {
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
let nextQueue = this._queue.values().next()
if (nextQueue.done) return
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
let nextQueue = { done: true }
// Check the queue first and fall back to the cold call queue
if (this._queue.size > 0) {
nextQueue = this._queue.values().next()
this._queue.delete(nextQueue.value)
} else if (this._coldCallQueue.size > 0) {
nextQueue = this._coldCallQueue.values().next()
this._coldCallQueue.delete(nextQueue.value)
}

if (nextQueue.done) {
return
}

this._queue.delete(nextQueue.value)
let targetQueue = this._queues[nextQueue.value]
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
Expand All @@ -83,7 +109,9 @@ class DialQueueManager {
* @param {PeerInfo} peerInfo
*/
clearBlacklist (peerInfo) {
this.getQueue(peerInfo).blackListed = null
const queue = this.getQueue(peerInfo)
queue.blackListed = null
queue.blackListCount = 0
}

/**
Expand Down
71 changes: 71 additions & 0 deletions test/dialer.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(require('chai-checkmark'))
chai.use(dirtyChai)
const sinon = require('sinon')

const PeerBook = require('peer-book')
const Queue = require('../src/dialer/queue')
const QueueManager = require('../src/dialer/queueManager')
const Switch = require('../src')

const utils = require('./utils')
const createInfos = utils.createInfos

describe('dialer', () => {
let switchA

before((done) => createInfos(1, (err, infos) => {
expect(err).to.not.exist()

switchA = new Switch(infos[0], new PeerBook())

done()
}))

afterEach(() => {
sinon.restore()
})

describe('queue', () => {
it('should blacklist forever after 5 blacklists', () => {
const queue = new Queue('QM', switchA)
for (var i = 0; i < 4; i++) {
queue.blacklist()
expect(queue.blackListed).to.be.a('number')
expect(queue.blackListed).to.not.eql(Infinity)
}

queue.blacklist()
expect(queue.blackListed).to.eql(Infinity)
})
})

describe('queue manager', () => {
let queueManager
before(() => {
queueManager = new QueueManager(switchA)
})

it('should abort cold calls when the queue is full', (done) => {
sinon.stub(queueManager._coldCallQueue, 'size').value(switchA.dialer.MAX_COLD_CALLS)
const dialRequest = {
peerInfo: {
id: { toB58String: () => 'QmA' }
},
protocol: null,
useFSM: true,
callback: (err) => {
expect(err.code).to.eql('DIAL_ABORTED')
done()
}
}

queueManager.add(dialRequest)
})
})
})