-
-
Notifications
You must be signed in to change notification settings - Fork 6.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix queue management in jest-worker #7934
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import { | |||||
ChildMessage, | ||||||
FarmOptions, | ||||||
QueueChildMessage, | ||||||
QueueItem, | ||||||
WorkerInterface, | ||||||
OnStart, | ||||||
OnEnd, | ||||||
|
@@ -19,24 +20,25 @@ export default class Farm { | |||||
private _computeWorkerKey: FarmOptions['computeWorkerKey']; | ||||||
private _cacheKeys: {[key: string]: WorkerInterface}; | ||||||
private _callback: Function; | ||||||
private _last: Array<QueueChildMessage>; | ||||||
private _last: Array<QueueItem>; | ||||||
private _locks: Array<boolean>; | ||||||
private _numOfWorkers: number; | ||||||
private _offset: number; | ||||||
private _queue: Array<QueueChildMessage | null>; | ||||||
private _queue: Array<QueueItem | null>; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
right? there can never be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The queue can be empty, and that's signaled by a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The array index refers to each of the workers (I.e. |
||||||
|
||||||
constructor( | ||||||
numOfWorkers: number, | ||||||
callback: Function, | ||||||
computeWorkerKey?: FarmOptions['computeWorkerKey'], | ||||||
) { | ||||||
this._callback = callback; | ||||||
this._numOfWorkers = numOfWorkers; | ||||||
this._cacheKeys = Object.create(null); | ||||||
this._queue = []; | ||||||
this._callback = callback; | ||||||
this._last = []; | ||||||
this._locks = []; | ||||||
this._numOfWorkers = numOfWorkers; | ||||||
this._offset = 0; | ||||||
this._queue = []; | ||||||
|
||||||
if (computeWorkerKey) { | ||||||
this._computeWorkerKey = computeWorkerKey; | ||||||
} | ||||||
|
@@ -70,6 +72,7 @@ export default class Farm { | |||||
}; | ||||||
|
||||||
const task = {onEnd, onStart, request}; | ||||||
|
||||||
if (worker) { | ||||||
this._enqueue(task, worker.getWorkerId()); | ||||||
} else { | ||||||
|
@@ -78,80 +81,82 @@ export default class Farm { | |||||
}); | ||||||
} | ||||||
|
||||||
private _getNextJob(workerId: number): QueueChildMessage | null { | ||||||
private _getNextTask(workerId: number): QueueChildMessage | null { | ||||||
let queueHead = this._queue[workerId]; | ||||||
|
||||||
while (queueHead && queueHead.request[1]) { | ||||||
while (queueHead && queueHead.task.request[1]) { | ||||||
queueHead = queueHead.next || null; | ||||||
} | ||||||
|
||||||
this._queue[workerId] = queueHead; | ||||||
|
||||||
return queueHead; | ||||||
return queueHead && queueHead.task; | ||||||
} | ||||||
|
||||||
private _process(workerId: number): Farm { | ||||||
if (this.isLocked(workerId)) { | ||||||
if (this._isLocked(workerId)) { | ||||||
return this; | ||||||
} | ||||||
|
||||||
const job = this._getNextJob(workerId); | ||||||
const task = this._getNextTask(workerId); | ||||||
|
||||||
if (!job) { | ||||||
if (!task) { | ||||||
return this; | ||||||
} | ||||||
|
||||||
const onEnd = (error: Error | null, result: unknown) => { | ||||||
job.onEnd(error, result); | ||||||
this.unlock(workerId); | ||||||
task.onEnd(error, result); | ||||||
|
||||||
this._unlock(workerId); | ||||||
this._process(workerId); | ||||||
}; | ||||||
|
||||||
this.lock(workerId); | ||||||
task.request[1] = true; | ||||||
|
||||||
this._callback(workerId, job.request, job.onStart, onEnd); | ||||||
|
||||||
job.request[1] = true; | ||||||
this._lock(workerId); | ||||||
this._callback(workerId, task.request, task.onStart, onEnd); | ||||||
|
||||||
return this; | ||||||
} | ||||||
|
||||||
private _enqueue(task: QueueChildMessage, workerId: number): Farm { | ||||||
const item = {next: null, task}; | ||||||
|
||||||
if (task.request[1]) { | ||||||
return this; | ||||||
} | ||||||
|
||||||
if (this._queue[workerId]) { | ||||||
this._last[workerId].next = task; | ||||||
this._last[workerId].next = item; | ||||||
} else { | ||||||
this._queue[workerId] = task; | ||||||
this._queue[workerId] = item; | ||||||
} | ||||||
|
||||||
this._last[workerId] = task; | ||||||
this._last[workerId] = item; | ||||||
this._process(workerId); | ||||||
|
||||||
return this; | ||||||
} | ||||||
|
||||||
private _push(task: QueueChildMessage): Farm { | ||||||
for (let i = 0; i < this._numOfWorkers; i++) { | ||||||
const workerIdx = (this._offset + i) % this._numOfWorkers; | ||||||
this._enqueue(task, workerIdx); | ||||||
this._enqueue(task, (this._offset + i) % this._numOfWorkers); | ||||||
} | ||||||
|
||||||
this._offset++; | ||||||
|
||||||
return this; | ||||||
} | ||||||
|
||||||
lock(workerId: number): void { | ||||||
private _lock(workerId: number): void { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically breaking, but I guess these shouldn't be called anyways by consumers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, playing with worker locks is a recipe for disaster :D |
||||||
this._locks[workerId] = true; | ||||||
} | ||||||
|
||||||
unlock(workerId: number): void { | ||||||
private _unlock(workerId: number): void { | ||||||
this._locks[workerId] = false; | ||||||
} | ||||||
|
||||||
isLocked(workerId: number): boolean { | ||||||
private _isLocked(workerId: number): boolean { | ||||||
return this._locks[workerId]; | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took me forever to spot the fixed missing paren :P