From 4a543cb80626e6aa901fbe9ee5824e9a9d5038b6 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 2 Apr 2019 14:53:55 +0200 Subject: [PATCH] feat: limit the number of cold calls we can do (#316) * feat: limit the number of cold calls we can do * feat: add a backoff to blacklisting * refactor: make cold calls configurable * fix: make blacklist duration longer * fix: improve blacklisting * test: add some tests for queue * feat: add jitter to blacklist ttl * test: validate cold queue is removed * feat: purge old queues every hour * test: fix aegir post script node shutdown * fix: abort the cold call queue on manager abort * fix: improve queue cleanup and lower interval to 15 mins --- .aegir.js | 2 +- README.md | 7 +- src/constants.js | 7 +- src/dialer/index.js | 9 +- src/dialer/queue.js | 24 ++++- src/dialer/queueManager.js | 103 ++++++++++++++++-- test/dialer.spec.js | 207 +++++++++++++++++++++++++++++++++++++ 7 files changed, 340 insertions(+), 19 deletions(-) create mode 100644 test/dialer.spec.js diff --git a/.aegir.js b/.aegir.js index d3166a7..da39d22 100644 --- a/.aegir.js +++ b/.aegir.js @@ -74,7 +74,7 @@ function pre (done) { function post (done) { parallel([ - (cb) => switchA.transport.close('ws', cb), + (cb) => switchA.stop(cb), (cb) => switchB.stop(cb), (cb) => sigS.stop(cb) ], done) diff --git a/README.md b/README.md index f51b097..26c23cd 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,11 @@ const sw = new switch(peerInfo , peerBook [, options]) If defined, `options` should be an object with the following keys and respective values: -- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds) -- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50` +- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Each successive blacklisting will increase the ttl from the base value. Defaults to 5 minutes +- `blackListAttempts`: - number of blacklists before a peer +is permanently blacklisted. Defaults to 5. +- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `100` +- `maxColdCalls`: - number of queued cold calls that are allowed. Defaults to `50` - `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds) - `stats`: an object with the following keys and respective values: - `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`. diff --git a/src/constants.js b/src/constants.js index ddf66a9..fded895 100644 --- a/src/constants.js +++ b/src/constants.js @@ -1,7 +1,10 @@ 'use strict' module.exports = { - BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again + BLACK_LIST_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again + BLACK_LIST_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently blacklisted DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take - MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials + MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued + MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials + QUARTER_HOUR: 15 * 60e3 } diff --git a/src/dialer/index.js b/src/dialer/index.js index a5c8018..8464d8e 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -2,7 +2,12 @@ const DialQueueManager = require('./queueManager') const getPeerInfo = require('../get-peer-info') -const { MAX_PARALLEL_DIALS, BLACK_LIST_TTL } = require('../constants') +const { + BLACK_LIST_ATTEMPTS, + BLACK_LIST_TTL, + MAX_COLD_CALLS, + MAX_PARALLEL_DIALS +} = require('../constants') module.exports = function (_switch) { const dialQueueManager = new DialQueueManager(_switch) @@ -74,7 +79,9 @@ module.exports = function (_switch) { dialFSM, abort, clearBlacklist, + BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts, BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL, + MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls, MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials } } diff --git a/src/dialer/queue.js b/src/dialer/queue.js index 08871d4..c5f97f9 100644 --- a/src/dialer/queue.js +++ b/src/dialer/queue.js @@ -70,6 +70,7 @@ class Queue { this.switch = _switch this._queue = [] this.blackListed = null + this.blackListCount = 0 this.isRunning = false this.onStopped = onStopped } @@ -97,7 +98,7 @@ class Queue { isDialAllowed () { if (this.blackListed) { // If the blacklist ttl has passed, reset it - if (Date.now() - this.blackListed > this.switch.dialer.BLACK_LIST_TTL) { + if (Date.now() > this.blackListed) { this.blackListed = null return true } @@ -146,10 +147,24 @@ class Queue { /** * Marks the queue as blacklisted. The queue will be immediately aborted. + * @returns {void} */ blacklist () { - log('blacklisting queue for %s', this.id) - this.blackListed = Date.now() + this.blackListCount++ + + if (this.blackListCount >= this.switch.dialer.BLACK_LIST_ATTEMPTS) { + this.blackListed = Infinity + return + } + + let ttl = this.switch.dialer.BLACK_LIST_TTL * Math.pow(this.blackListCount, 3) + const minTTL = ttl * 0.9 + const maxTTL = ttl * 1.1 + + // Add a random jitter of 20% to the ttl + ttl = Math.floor(Math.random() * (maxTTL - minTTL) + minTTL) + + this.blackListed = Date.now() + ttl this.abort() } @@ -236,6 +251,7 @@ class Queue { // If we're not muxed yet, add listeners connectionFSM.once('muxed', () => { + this.blackListCount = 0 // reset blacklisting on good connections this.switch.connection.add(connectionFSM) queuedDial.connection = connectionFSM createConnectionWithProtocol(queuedDial) @@ -243,6 +259,8 @@ class Queue { }) connectionFSM.once('unmuxed', () => { + this.blackListCount = 0 + this.switch.connection.add(connectionFSM) queuedDial.connection = connectionFSM createConnectionWithProtocol(queuedDial) next() diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index 96ab36f..2ca5283 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -2,6 +2,10 @@ const once = require('once') const Queue = require('./queue') +const { DIAL_ABORTED } = require('../errors') +const nextTick = require('async/nextTick') +const retimer = require('retimer') +const { QUARTER_HOUR } = require('../constants') const noop = () => {} class DialQueueManager { @@ -11,9 +15,53 @@ class DialQueueManager { */ constructor (_switch) { this._queue = new Set() + this._coldCallQueue = new Set() this._dialingQueues = new Set() this._queues = {} this.switch = _switch + this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR) + } + + /** + * Runs through all queues, aborts and removes them if they + * are no longer valid. A queue that is blacklisted indefinitely, + * is considered no longer valid. + * @private + */ + _clean () { + const queues = Object.values(this._queues) + queues.forEach(dialQueue => { + // Clear if the queue has reached max blacklist + if (dialQueue.blackListed === Infinity) { + dialQueue.abort() + delete this._queues[dialQueue.id] + return + } + + // Keep track of blacklisted queues + if (dialQueue.blackListed) return + + // Clear if peer is no longer active + // To avoid reallocating memory, dont delete queues of + // connected peers, as these are highly likely to leverage the + // queues in the immediate term + if (!dialQueue.isRunning && dialQueue.length < 1) { + let isConnected = false + try { + const peerInfo = this.switch._peerBook.get(dialQueue.id) + isConnected = Boolean(peerInfo.isConnected()) + } catch (_) { + // If we get an error, that means the peerbook doesnt have the peer + } + + if (!isConnected) { + dialQueue.abort() + delete this._queues[dialQueue.id] + } + } + }) + + this._cleanInterval.reschedule(QUARTER_HOUR) } /** @@ -25,16 +73,21 @@ class DialQueueManager { abort () { // Clear the general queue this._queue.clear() + // Clear the cold call queue + this._coldCallQueue.clear() + + this._cleanInterval.clear() // Abort the individual peer queues const queues = Object.values(this._queues) queues.forEach(dialQueue => { dialQueue.abort() + delete this._queues[dialQueue.id] }) } /** - * Adds the `dialRequest` to the queue and ensures the queue is running + * Adds the `dialRequest` to the queue and ensures queue is running * * @param {DialRequest} dialRequest * @returns {void} @@ -44,6 +97,11 @@ class DialQueueManager { // Add the dial to its respective queue const targetQueue = this.getQueue(peerInfo) + // If we have too many cold calls, abort the dial immediately + if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) { + return nextTick(callback, DIAL_ABORTED()) + } + targetQueue.add(protocol, useFSM, callback) // If we're already connected to the peer, start the queue now @@ -54,10 +112,23 @@ class DialQueueManager { return } - // Add the id to the general queue set if the queue isn't running - // and if the queue is allowed to dial - if (!targetQueue.isRunning && targetQueue.isDialAllowed()) { - this._queue.add(targetQueue.id) + // If dialing is not allowed, abort + if (!targetQueue.isDialAllowed()) { + return + } + + // Add the id to its respective queue set if the queue isn't running + if (!targetQueue.isRunning) { + if (protocol) { + this._queue.add(targetQueue.id) + this._coldCallQueue.delete(targetQueue.id) + // Only add it to the cold queue if it's not in the normal queue + } else if (!this._queue.has(targetQueue.id)) { + this._coldCallQueue.add(targetQueue.id) + // The peer is already in the normal queue, abort the cold call + } else { + return nextTick(callback, DIAL_ABORTED()) + } } this.run() @@ -67,11 +138,21 @@ class DialQueueManager { * Will execute up to `MAX_PARALLEL_DIALS` dials */ run () { - if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) { - let nextQueue = this._queue.values().next() - if (nextQueue.done) return + if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) { + let nextQueue = { done: true } + // Check the queue first and fall back to the cold call queue + if (this._queue.size > 0) { + nextQueue = this._queue.values().next() + this._queue.delete(nextQueue.value) + } else if (this._coldCallQueue.size > 0) { + nextQueue = this._coldCallQueue.values().next() + this._coldCallQueue.delete(nextQueue.value) + } + + if (nextQueue.done) { + return + } - this._queue.delete(nextQueue.value) let targetQueue = this._queues[nextQueue.value] this._dialingQueues.add(targetQueue.id) targetQueue.start() @@ -83,7 +164,9 @@ class DialQueueManager { * @param {PeerInfo} peerInfo */ clearBlacklist (peerInfo) { - this.getQueue(peerInfo).blackListed = null + const queue = this.getQueue(peerInfo) + queue.blackListed = null + queue.blackListCount = 0 } /** diff --git a/test/dialer.spec.js b/test/dialer.spec.js new file mode 100644 index 0000000..277af90 --- /dev/null +++ b/test/dialer.spec.js @@ -0,0 +1,207 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(require('chai-checkmark')) +chai.use(dirtyChai) +const sinon = require('sinon') + +const PeerBook = require('peer-book') +const Queue = require('../src/dialer/queue') +const QueueManager = require('../src/dialer/queueManager') +const Switch = require('../src') + +const utils = require('./utils') +const createInfos = utils.createInfos + +describe('dialer', () => { + let switchA + + before((done) => createInfos(1, (err, infos) => { + expect(err).to.not.exist() + + switchA = new Switch(infos[0], new PeerBook()) + + done() + })) + + afterEach(() => { + sinon.restore() + }) + + describe('queue', () => { + it('should blacklist forever after 5 blacklists', () => { + const queue = new Queue('QM', switchA) + for (var i = 0; i < 4; i++) { + queue.blacklist() + expect(queue.blackListed).to.be.a('number') + expect(queue.blackListed).to.not.eql(Infinity) + } + + queue.blacklist() + expect(queue.blackListed).to.eql(Infinity) + }) + }) + + describe('queue manager', () => { + let queueManager + before(() => { + queueManager = new QueueManager(switchA) + }) + + it('should abort cold calls when the queue is full', (done) => { + sinon.stub(queueManager._coldCallQueue, 'size').value(switchA.dialer.MAX_COLD_CALLS) + const dialRequest = { + peerInfo: { + id: { toB58String: () => 'QmA' } + }, + protocol: null, + useFSM: true, + callback: (err) => { + expect(err.code).to.eql('DIAL_ABORTED') + done() + } + } + + queueManager.add(dialRequest) + }) + + it('should add a protocol dial to the normal queue', () => { + const dialRequest = { + peerInfo: { + id: { toB58String: () => 'QmA' }, + isConnected: () => null + }, + protocol: '/echo/1.0.0', + useFSM: true, + callback: () => {} + } + + const runSpy = sinon.stub(queueManager, 'run') + const addSpy = sinon.stub(queueManager._queue, 'add') + const deleteSpy = sinon.stub(queueManager._coldCallQueue, 'delete') + + queueManager.add(dialRequest) + + expect(runSpy.called).to.eql(true) + expect(addSpy.called).to.eql(true) + expect(addSpy.getCall(0).args[0]).to.eql('QmA') + expect(deleteSpy.called).to.eql(true) + expect(deleteSpy.getCall(0).args[0]).to.eql('QmA') + }) + + it('should add a cold call to the cold call queue', () => { + const dialRequest = { + peerInfo: { + id: { toB58String: () => 'QmA' }, + isConnected: () => null + }, + protocol: null, + useFSM: true, + callback: () => {} + } + + const runSpy = sinon.stub(queueManager, 'run') + const addSpy = sinon.stub(queueManager._coldCallQueue, 'add') + + queueManager.add(dialRequest) + + expect(runSpy.called).to.eql(true) + expect(addSpy.called).to.eql(true) + expect(addSpy.getCall(0).args[0]).to.eql('QmA') + }) + + it('should abort a cold call if it\'s in the normal queue', (done) => { + const dialRequest = { + peerInfo: { + id: { toB58String: () => 'QmA' }, + isConnected: () => null + }, + protocol: null, + useFSM: true, + callback: (err) => { + expect(runSpy.called).to.eql(false) + expect(hasSpy.called).to.eql(true) + expect(hasSpy.getCall(0).args[0]).to.eql('QmA') + expect(err.code).to.eql('DIAL_ABORTED') + done() + } + } + + const runSpy = sinon.stub(queueManager, 'run') + const hasSpy = sinon.stub(queueManager._queue, 'has').returns(true) + + queueManager.add(dialRequest) + }) + + it('should remove a queue that has reached max blacklist', () => { + const queue = new Queue('QmA', switchA) + queue.blackListed = Infinity + + const abortSpy = sinon.spy(queue, 'abort') + const queueManager = new QueueManager(switchA) + queueManager._queues[queue.id] = queue + + queueManager._clean() + + expect(abortSpy.called).to.eql(true) + expect(queueManager._queues).to.eql({}) + }) + + it('should not remove a queue that is blacklisted below max', () => { + const queue = new Queue('QmA', switchA) + queue.blackListed = Date.now() + 10e3 + + const abortSpy = sinon.spy(queue, 'abort') + const queueManager = new QueueManager(switchA) + queueManager._queues[queue.id] = queue + + queueManager._clean() + + expect(abortSpy.called).to.eql(false) + expect(queueManager._queues).to.eql({ + QmA: queue + }) + }) + + it('should remove a queue that is not running and the peer is not connected', () => { + const disconnectedPeer = { + id: { toB58String: () => 'QmA' }, + isConnected: () => null + } + const queue = new Queue(disconnectedPeer.id.toB58String(), switchA) + + const abortSpy = sinon.spy(queue, 'abort') + const queueManager = new QueueManager(switchA) + queueManager._queues[queue.id] = queue + + queueManager._clean() + + expect(abortSpy.called).to.eql(true) + expect(queueManager._queues).to.eql({}) + }) + + it('should not remove a queue that is not running but the peer is connected', () => { + const connectedPeer = { + id: { toB58String: () => 'QmA' }, + isConnected: () => true + } + const queue = new Queue(connectedPeer.id.toB58String(), switchA) + + switchA._peerBook.put(connectedPeer) + + const abortSpy = sinon.spy(queue, 'abort') + const queueManager = new QueueManager(switchA) + queueManager._queues[queue.id] = queue + + queueManager._clean() + + expect(abortSpy.called).to.eql(false) + expect(queueManager._queues).to.eql({ + QmA: queue + }) + }) + }) +})