Skip to content
This repository has been archived by the owner on Jan 25, 2024. It is now read-only.

Commit

Permalink
simple waiting queue for requests
Browse files Browse the repository at this point in the history
this should fix a lot of issues, as well as make it a ton easier to build
systems that won't blow up your api quotas

right now, it's just a simple "wait" queue, it needs to be implemented
in such a way so that there's a "waiting" and a "running" list, because
right now it's a hybrid of the old brute-force "re-run everything as soon
as a timer expires", and a better queue implementation.

throttlecheck has been updated to stagger it's requests, and uses
getMarketplaces rather than getProductCategories.
  • Loading branch information
ericblade committed Dec 7, 2018
1 parent 0fb5610 commit ea44846
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 34 deletions.
148 changes: 148 additions & 0 deletions lib/endpoints/Queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// TODO: Product category at least has a hourly request quota that is
// not tracked anywhere currently.

class QueueItem {
constructor({
api,
category,
action,
params,
options,
resolver,
rejecter,
onComplete,
}) {
this.api = api;
this.category = category;
this.action = action;
this.params = params;
this.options = options;
this.resolver = resolver;
this.rejecter = rejecter;
this.onComplete = onComplete;

this.run = this.run.bind(this);
}

async run() {
try {
const res = await this.api.requestPromise(this.params, this.options);
this.resolver(res);
this.onComplete();
} catch (err) {
this.rejecter(err);
}
}
}

class Queue {
constructor({
api,
category,
action,
maxInFlight,
restoreRate,
}) {
this.api = api;
this.category = category;
this.action = action;
this.inFlight = 0;
this.maxInFlight = maxInFlight || 200;
this.restoreRate = restoreRate || 0;
this.queue = [];
this.queueTimer = null;

this.throttle = this.throttle.bind(this);
this.setThrottleTimer = this.setThrottleTimer.bind(this);
this.onQueueTimer = this.onQueueTimer.bind(this);
this.drainQueue = this.drainQueue.bind(this);
this.complete = this.complete.bind(this);
this.runQueue = this.runQueue.bind(this);
this.request = this.request.bind(this);
}

throttle() {
console.warn('* THROTTLE HIT');
this.setThrottleTimer();
}

setThrottleTimer() {
if (this.queueTimer) {
// already running, so we shouldn't have sent any more requests
// since we set the timer, so we shouldn't need to re-set the timer
return;
}
this.queueTimer = setTimeout(this.onQueueTimer, (this.restoreRate || 1) * 60 * 1000);
}

onQueueTimer() {
console.warn('* throttle timeout, draining');
this.queueTimer = null;
this.runQueue();
}

drainQueue() {
if (this.queueTimer) {
// console.warn('* ignoring drain request, waiting on throttle timer');
return;
}
if (!this.queue.length) {
// console.warn('* ignoring drain request, queue empty');
return;
}

// TODO: this should schedule staggered runs, so that if we get another
// throttle response, we get halted.
// console.warn('* drainQueue length at start', this.queue.length);
while (this.queue.length && this.inFlight < this.maxInFlight) {
this.runQueue();
}
if (this.queue.length >= this.maxInFlight) {
this.setThrottleTimer();
}
// console.warn('* drainQueue at end', this.queue.length, this.inFlight);
}

complete() {
this.inFlight -= 1;
this.drainQueue();
}

// TODO: ideally we should move these items into a 'running' queue versus a 'waiting' queue
// rather than removing them from the waiting queue, so that they can be re-run inside this
// queue, should they fail. also, we should store the promise in the QueueItem, so we can
// monitor it's progress here, instead of doing it outside, in callEndpoint as it does
// currently. That is for another day, though.
runQueue() {
if (this.queueTimer) {
console.warn('* ignoring run request, throttle timer running');
}
const item = this.queue.shift();
if (item) {
this.inFlight += 1;
item.run();
}
}

request(params, options) {
return new Promise((resolve, reject) => {
const action = new QueueItem({
api: this.api,
category: this.category,
action: this.action,
params,
options,
resolver: resolve,
rejecter: reject,
onComplete: this.complete,
});
this.queue.push(action);
setImmediate(this.drainQueue);
// if (this.inFlight < this.maxInFlight || !this.maxInFlight) {
// this.runQueue();
// }
});
}
}

module.exports = Queue;
1 change: 1 addition & 0 deletions lib/helpers/getProductCategories.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const getProductCategoriesForSku = api => ({ marketplaceId, sellerSku }) => api.
* @param {productCategory} [Self] - The product category that this SKU belongs to - if not present, may be an invalid ASIN
*/

// TODO: we should strip out duplicates! use the Set trick
const getProductCategoriesForAsins = api => ({ marketplaceId, asins }) => {
const results = asins.map(asin => getProductCategoriesForAsin(api)({ marketplaceId, asin }));
return Promise.all(results);
Expand Down
24 changes: 22 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const fs = require('fs');
const path = require('path');

const MWS = require('mws-simple');
const Queue = require('./endpoints/Queue');

const sleep = require('./util/sleep');
const errors = require('./errors');
Expand Down Expand Up @@ -52,6 +53,7 @@ class MwsAdvanced {
constructor(...args) {
this.init(...args);
this.callEndpoint = this.callEndpoint.bind(this);
this.queues = {};

// argh, binding of functions.
this.init = this.init.bind(this);
Expand Down Expand Up @@ -196,10 +198,21 @@ class MwsAdvanced {
feedContent,
};

let q = this.queues[`${endpoint.category}/${endpoint.action}`];
if (!q) {
q = new Queue({
api: this,
category: endpoint.category,
action: endpoint.action,
maxInFlight: (endpoint.throttle && endpoint.throttle.maxInFlight) || 0,
restoreRate: (endpoint.throttle && endpoint.throttle.restoreRate) || 0,
});
this.queues[`${endpoint.category}/${endpoint.action}`] = q;
}
let throttleRetries = 0;
// TODO: reduce the size of this try!
try {
const result = await this.requestPromise(params, { noFlatten: opt.noFlatten });
const result = await q.request(params, { noFlatten: opt.noFlatten });
if (opt.saveRaw) {
fs.writeFileSync(path.join(process.cwd(), opt.saveRaw), JSON.stringify(result));
}
Expand All @@ -220,12 +233,16 @@ class MwsAdvanced {
error: err,
/* headers */
} = error;
// TODO: throttleRetries is always 1, because we're recursively
// calling this function.. that needs to be external!!!
// ... of course, that's what the Queue class is to help with.
if (err instanceof this.mws.ServerError) {
if (
err.code === 503
&& opt.maxThrottleRetries > 0
&& throttleRetries <= opt.maxThrottleRetries
) {
q.throttle();
throttleRetries += 1;
console.warn(
'***** Error 503 .. throttling?',
Expand Down Expand Up @@ -261,8 +278,11 @@ class MwsAdvanced {
}
console.warn(`***** trying again in ${ms}ms`);
await sleep(ms + 100);
// mark it complete in the queue, so the handler doesn't think the queue is overfull
q.complete();
console.warn('**** retrying request', name);
return this.callEndpoint(name, callOptions, opt);
const newPromise = this.callEndpoint(name, callOptions, opt);
return newPromise;
}
}
throw err;
Expand Down
80 changes: 48 additions & 32 deletions samples/throttlecheck.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,62 @@
// note that if your throttle is clear at the start of running this, you
// should expect it to take right around 7 minutes or so to complete a 20
// request spam to getMarketplaces()

const mws = require('..');
const keys = require('../test/keys.json');

mws.init(keys);

const getCats = asins => (
mws.getProductCategoriesForAsins({
marketplaceId: 'ATVPDKIKX0DER',
asins,
})
);
// const getCats = asins => (
// console.warn('* getting categories for asin count', asins.length),
// mws.getProductCategoriesForAsins({
// marketplaceId: 'ATVPDKIKX0DER',
// asins,
// })
// );

// async function main() {
// return getCats([
// '0534645577',
// 'B00IDD9TU8',
// 'B00IH00CN0',
// 'B07CBM6CDD',
// 'B00OPXKUDK',
// '1573833908',
// '1578262666',
// '517029252X',
// '0810408813',
// '0793802946',
// '0814471048',
// 'B000GCWNPW',
// 'B0027DPF62',
// 'B00005T3EN',
// 'B00006I5V6',
// '159116060X',
// 'B003FBG4SS',
// '0321568095',
// 'B00006B8FZ',
// 'B0000AHOOM',
// 'B00000JQQH',
// '0078034639',
// ]);
// }

async function main() {
return getCats([
'0534645577',
'B00IDD9TU8',
'B00IH00CN0',
'B07CBM6CDD',
'B00OPXKUDK',
'1573833908',
'1578262666',
'517029252X',
'0810408813',
'0793802946',
'0814471048',
'B000GCWNPW',
'B0027DPF62',
'B00005T3EN',
'B00006I5V6',
'159116060X',
'B003FBG4SS',
'0321568095',
'B00006B8FZ',
'B0000AHOOM',
'B00000JQQH',
'0078034639',
]);
const arr = [];
for (let x = 0; x < 20; x += 1) {
arr.push(new Promise((resolve) => {
setTimeout(() => mws.getMarketplaces().then(res => resolve(res)), x * 200);
}));
}
await Promise.all(arr).catch(err => console.warn('* err', err));
return arr;
}

/* eslint-disable */
let x = 1;
while (x-- > 0) {
main().then(x => {
console.warn('* x=', x);
main().then(res => {
console.warn('* res length=', res.length);
});
}

0 comments on commit ea44846

Please sign in to comment.