From c6c9b4c6490f3e5d28014c0cce9f191fd7f37018 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Thu, 19 Jan 2023 14:50:01 -0800 Subject: [PATCH 1/6] Update WorkerPool to reuse Workers --- packages/terser/src/module.ts | 21 +++++- packages/terser/src/worker-pool.ts | 114 +++++++++++++++-------------- packages/terser/src/worker.ts | 29 ++++---- packages/terser/test/test.js | 4 +- 4 files changed, 93 insertions(+), 75 deletions(-) diff --git a/packages/terser/src/module.ts b/packages/terser/src/module.ts index a74cc7f3c..199811e3e 100644 --- a/packages/terser/src/module.ts +++ b/packages/terser/src/module.ts @@ -9,15 +9,22 @@ import { WorkerPool } from './worker-pool'; export default function terser(input: Options = {}) { const { maxWorkers, ...options } = input; - const workerPool = new WorkerPool({ - filePath: fileURLToPath(import.meta.url), - maxWorkers - }); + let workerPool: WorkerPool | null; + let numOfChunks = 0; return { name: 'terser', async renderChunk(code: string, chunk: RenderedChunk, outputOptions: NormalizedOutputOptions) { + if (!workerPool) { + workerPool = new WorkerPool({ + filePath: fileURLToPath(import.meta.url), + maxWorkers + }); + } + + numOfChunks += 1; + const defaultOptions: Options = { sourceMap: outputOptions.sourcemap === true || typeof outputOptions.sourcemap === 'string' }; @@ -80,6 +87,12 @@ export default function terser(input: Options = {}) { return result; } catch (e) { return Promise.reject(e); + } finally { + numOfChunks -= 1; + if (numOfChunks === 0) { + workerPool.close(); + workerPool = null; + } } } }; diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index 5154d4510..9941058cd 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -1,3 +1,4 @@ +import { AsyncResource } from 'async_hooks'; import { Worker } from 'worker_threads'; import { cpus } from 'os'; import { EventEmitter } from 'events'; @@ -12,7 +13,21 @@ import type { WorkerPoolTask } from './type'; -const symbol = Symbol.for('FreeWoker'); +const taskInfo = Symbol('taskInfo'); +const freeWorker = Symbol('freeWorker'); + +type WorkerWithTaskInfo = Worker & { [taskInfo]?: null | WorkerPoolTaskInfo }; + +class WorkerPoolTaskInfo extends AsyncResource { + constructor(private callback: WorkerCallback) { + super('WorkerPoolTaskInfo'); + } + + done(err: Error | null, result: any) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); + } +} export class WorkerPool extends EventEmitter { protected maxInstances: number; @@ -21,7 +36,8 @@ export class WorkerPool extends EventEmitter { protected tasks: WorkerPoolTask[] = []; - protected workers = 0; + protected workers: WorkerWithTaskInfo[] = []; + protected freeWorkers: WorkerWithTaskInfo[] = []; constructor(options: WorkerPoolOptions) { super(); @@ -29,29 +45,17 @@ export class WorkerPool extends EventEmitter { this.maxInstances = options.maxWorkers || cpus().length; this.filePath = options.filePath; - this.on(symbol, () => { + this.on(freeWorker, () => { if (this.tasks.length > 0) { - this.run(); + const { context, cb } = this.tasks.shift()!; + this.runTask(context, cb); } }); } - add(context: WorkerContext, cb: WorkerCallback) { - this.tasks.push({ - context, - cb - }); - - if (this.workers >= this.maxInstances) { - return; - } - - this.run(); - } - - async addAsync(context: WorkerContext): Promise { + addAsync(context: WorkerContext): Promise { return new Promise((resolve, reject) => { - this.add(context, (err, output) => { + this.runTask(context, (err, output) => { if (err) { reject(err); return; @@ -67,51 +71,51 @@ export class WorkerPool extends EventEmitter { }); } - private run() { - if (this.tasks.length === 0) { - return; - } - - const task = this.tasks.shift(); - - if (typeof task === 'undefined') { - return; + close() { + for (const worker of this.workers) { + worker.terminate(); } + } - this.workers += 1; - - let called = false; - const callCallback = (err: Error | null, output?: WorkerOutput) => { - if (called) { - return; - } - called = true; - - this.workers -= 1; - - task.cb(err, output); - this.emit(symbol); - }; - - const worker = new Worker(this.filePath, { - workerData: { - code: task.context.code, - options: serializeJavascript(task.context.options) - } - }); + private addNewWorker() { + const worker: WorkerWithTaskInfo = new Worker(this.filePath); - worker.on('message', (data) => { - callCallback(null, data); + worker.on('message', (result) => { + worker[taskInfo]!.done(null, result); + worker[taskInfo] = null; + this.freeWorkers.push(worker); + this.emit(freeWorker); }); worker.on('error', (err) => { - callCallback(err); + if (worker[taskInfo]) { + worker[taskInfo].done(err, null); + } else { + this.emit('error', err); + } + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); }); - worker.on('exit', (code) => { - if (code !== 0) { - callCallback(new Error(`Minify worker stopped with exit code ${code}`)); + this.workers.push(worker); + this.freeWorkers.push(worker); + this.emit(freeWorker); + } + + private runTask(context: WorkerContext, cb: WorkerCallback) { + if (this.freeWorkers.length === 0) { + this.tasks.push({ context, cb }); + if (this.workers.length < this.maxInstances) { + this.addNewWorker(); } + return; + } + + const worker = this.freeWorkers.pop()!; + worker[taskInfo] = new WorkerPoolTaskInfo(cb); + worker.postMessage({ + code: context.code, + options: serializeJavascript(context.options) }); } } diff --git a/packages/terser/src/worker.ts b/packages/terser/src/worker.ts index 1c3ea03b0..118456488 100644 --- a/packages/terser/src/worker.ts +++ b/packages/terser/src/worker.ts @@ -1,5 +1,4 @@ -import process from 'process'; -import { isMainThread, parentPort, workerData } from 'worker_threads'; +import { isMainThread, parentPort } from 'worker_threads'; import { hasOwnProperty, isObject } from 'smob'; @@ -22,21 +21,25 @@ function isWorkerContextSerialized(input: unknown): input is WorkerContextSerial ); } -export async function runWorker() { - if (isMainThread || !parentPort || !isWorkerContextSerialized(workerData)) { +export function runWorker() { + if (isMainThread || !parentPort) { return; } - try { - // eslint-disable-next-line no-eval - const eval2 = eval; + // eslint-disable-next-line no-eval + const eval2 = eval; - const options = eval2(`(${workerData.options})`); + parentPort.on('message', async (data: WorkerContextSerialized) => { + if (!isWorkerContextSerialized(data)) { + return; + } + + const options = eval2(`(${data.options})`); - const result = await minify(workerData.code, options); + const result = await minify(data.code, options); const output: WorkerOutput = { - code: result.code || workerData.code, + code: result.code || data.code, nameCache: options.nameCache }; @@ -48,8 +51,6 @@ export async function runWorker() { output.sourceMap = result.map; } - parentPort.postMessage(output); - } catch (e) { - process.exit(1); - } + parentPort!.postMessage(output); + }); } diff --git a/packages/terser/test/test.js b/packages/terser/test/test.js index 67d1eb6df..a22f7e04d 100644 --- a/packages/terser/test/test.js +++ b/packages/terser/test/test.js @@ -122,7 +122,7 @@ test.serial('throw error on terser fail', async (t) => { await bundle.generate({ format: 'esm' }); t.falsy(true); } catch (error) { - t.is(error.toString(), 'Error: Minify worker stopped with exit code 1'); + t.is(error.toString(), 'SyntaxError: Name expected'); } }); @@ -142,7 +142,7 @@ test.serial('throw error on terser fail with multiple outputs', async (t) => { await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'esm' })]); t.falsy(true); } catch (error) { - t.is(error.toString(), 'Error: Minify worker stopped with exit code 1'); + t.is(error.toString(), 'SyntaxError: Name expected'); } }); From 8b14a3038776268143c5c5cf1dd47e031aa21bc0 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Sat, 21 Jan 2023 10:04:44 -0800 Subject: [PATCH 2/6] test number of workers used --- packages/terser/src/module.ts | 6 ++++++ packages/terser/src/worker-pool.ts | 6 +++++- packages/terser/test/test.js | 18 +++++++++++++++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/packages/terser/src/module.ts b/packages/terser/src/module.ts index 199811e3e..da1aa6887 100644 --- a/packages/terser/src/module.ts +++ b/packages/terser/src/module.ts @@ -11,6 +11,7 @@ export default function terser(input: Options = {}) { let workerPool: WorkerPool | null; let numOfChunks = 0; + let numOfWorkersUsed = 0; return { name: 'terser', @@ -90,10 +91,15 @@ export default function terser(input: Options = {}) { } finally { numOfChunks -= 1; if (numOfChunks === 0) { + numOfWorkersUsed = workerPool.numWorkers; workerPool.close(); workerPool = null; } } + }, + + get numOfWorkersUsed() { + return numOfWorkersUsed; } }; } diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index 9941058cd..034928779 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -53,6 +53,10 @@ export class WorkerPool extends EventEmitter { }); } + get numWorkers(): number { + return this.workers.length; + } + addAsync(context: WorkerContext): Promise { return new Promise((resolve, reject) => { this.runTask(context, (err, output) => { @@ -105,7 +109,7 @@ export class WorkerPool extends EventEmitter { private runTask(context: WorkerContext, cb: WorkerCallback) { if (this.freeWorkers.length === 0) { this.tasks.push({ context, cb }); - if (this.workers.length < this.maxInstances) { + if (this.numWorkers < this.maxInstances) { this.addNewWorker(); } return; diff --git a/packages/terser/test/test.js b/packages/terser/test/test.js index a22f7e04d..316fb2288 100644 --- a/packages/terser/test/test.js +++ b/packages/terser/test/test.js @@ -46,9 +46,11 @@ test.serial('minify via terser options', async (t) => { }); test.serial('minify multiple outputs', async (t) => { + let plugin; + const bundle = await rollup({ input: 'test/fixtures/unminified.js', - plugins: [terser()] + plugins: [(plugin = terser({ maxWorkers: 2 }))] }); const [bundle1, bundle2] = await Promise.all([ @@ -60,6 +62,20 @@ test.serial('minify multiple outputs', async (t) => { t.is(output1.code, '"use strict";window.a=5,window.a<3&&console.log(4);\n'); t.is(output2.code, 'window.a=5,window.a<3&&console.log(4);\n'); + t.is(plugin.numOfWorkersUsed, 2, 'used 2 workers'); +}); + +test.serial('minify multiple outputs with only 1 worker', async (t) => { + let plugin; + + const bundle = await rollup({ + input: 'test/fixtures/unminified.js', + plugins: [(plugin = terser({ maxWorkers: 1 }))] + }); + + await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'es' })]); + + t.is(plugin.numOfWorkersUsed, 1, 'used 1 worker'); }); test.serial('minify esm module', async (t) => { From 228c139c10ea243208428bcdfcdc2eb662f50d92 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Sun, 22 Jan 2023 09:26:10 -0800 Subject: [PATCH 3/6] Address feedback --- packages/terser/src/module.ts | 4 ++-- packages/terser/src/worker-pool.ts | 20 +++++++++++--------- packages/terser/src/worker.ts | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/packages/terser/src/module.ts b/packages/terser/src/module.ts index da1aa6887..9d3504d52 100644 --- a/packages/terser/src/module.ts +++ b/packages/terser/src/module.ts @@ -9,7 +9,7 @@ import { WorkerPool } from './worker-pool'; export default function terser(input: Options = {}) { const { maxWorkers, ...options } = input; - let workerPool: WorkerPool | null; + let workerPool: WorkerPool | undefined; let numOfChunks = 0; let numOfWorkersUsed = 0; @@ -93,7 +93,7 @@ export default function terser(input: Options = {}) { if (numOfChunks === 0) { numOfWorkersUsed = workerPool.numWorkers; workerPool.close(); - workerPool = null; + workerPool = undefined; } } }, diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index 034928779..d87dff6cb 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -16,7 +16,7 @@ import type { const taskInfo = Symbol('taskInfo'); const freeWorker = Symbol('freeWorker'); -type WorkerWithTaskInfo = Worker & { [taskInfo]?: null | WorkerPoolTaskInfo }; +type WorkerWithTaskInfo = Worker & { [taskInfo]?: undefined | WorkerPoolTaskInfo }; class WorkerPoolTaskInfo extends AsyncResource { constructor(private callback: WorkerCallback) { @@ -85,8 +85,8 @@ export class WorkerPool extends EventEmitter { const worker: WorkerWithTaskInfo = new Worker(this.filePath); worker.on('message', (result) => { - worker[taskInfo]!.done(null, result); - worker[taskInfo] = null; + worker[taskInfo]?.done(null, result); + worker[taskInfo] = undefined; this.freeWorkers.push(worker); this.emit(freeWorker); }); @@ -115,11 +115,13 @@ export class WorkerPool extends EventEmitter { return; } - const worker = this.freeWorkers.pop()!; - worker[taskInfo] = new WorkerPoolTaskInfo(cb); - worker.postMessage({ - code: context.code, - options: serializeJavascript(context.options) - }); + const worker = this.freeWorkers.pop(); + if (worker) { + worker[taskInfo] = new WorkerPoolTaskInfo(cb); + worker.postMessage({ + code: context.code, + options: serializeJavascript(context.options) + }); + } } } diff --git a/packages/terser/src/worker.ts b/packages/terser/src/worker.ts index 118456488..d2c528c1e 100644 --- a/packages/terser/src/worker.ts +++ b/packages/terser/src/worker.ts @@ -51,6 +51,6 @@ export function runWorker() { output.sourceMap = result.map; } - parentPort!.postMessage(output); + parentPort?.postMessage(output); }); } From 62d0726fc65088de73a4933f3491b9b7276cd604 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Sun, 22 Jan 2023 09:45:05 -0800 Subject: [PATCH 4/6] Fix ESLint warnings --- packages/terser/src/module.ts | 4 ++-- packages/terser/src/worker-pool.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/terser/src/module.ts b/packages/terser/src/module.ts index 9d3504d52..2877a6e49 100644 --- a/packages/terser/src/module.ts +++ b/packages/terser/src/module.ts @@ -9,7 +9,7 @@ import { WorkerPool } from './worker-pool'; export default function terser(input: Options = {}) { const { maxWorkers, ...options } = input; - let workerPool: WorkerPool | undefined; + let workerPool: WorkerPool | null | undefined; let numOfChunks = 0; let numOfWorkersUsed = 0; @@ -93,7 +93,7 @@ export default function terser(input: Options = {}) { if (numOfChunks === 0) { numOfWorkersUsed = workerPool.numWorkers; workerPool.close(); - workerPool = undefined; + workerPool = null; } } }, diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index d87dff6cb..e3b12dae4 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -16,7 +16,7 @@ import type { const taskInfo = Symbol('taskInfo'); const freeWorker = Symbol('freeWorker'); -type WorkerWithTaskInfo = Worker & { [taskInfo]?: undefined | WorkerPoolTaskInfo }; +type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null }; class WorkerPoolTaskInfo extends AsyncResource { constructor(private callback: WorkerCallback) { @@ -86,7 +86,7 @@ export class WorkerPool extends EventEmitter { worker.on('message', (result) => { worker[taskInfo]?.done(null, result); - worker[taskInfo] = undefined; + worker[taskInfo] = null; this.freeWorkers.push(worker); this.emit(freeWorker); }); From 7334a26cda86c9ebf10a570e2b512ddd57337226 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Mon, 23 Jan 2023 09:08:10 -0800 Subject: [PATCH 5/6] Use regular `for` loop --- packages/terser/src/worker-pool.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index e3b12dae4..c1c92620e 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -76,7 +76,8 @@ export class WorkerPool extends EventEmitter { } close() { - for (const worker of this.workers) { + for (let i = 0; i < this.workers.length; i++) { + const worker = this.workers[i]; worker.terminate(); } } From ab26a8a4e6f5c475d414911bae9f1994196d1710 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Mon, 23 Jan 2023 09:53:16 -0800 Subject: [PATCH 6/6] Address feedback --- packages/terser/src/constants.ts | 2 ++ packages/terser/src/type.ts | 11 +++++++++++ packages/terser/src/worker-pool.ts | 10 ++++------ 3 files changed, 17 insertions(+), 6 deletions(-) create mode 100644 packages/terser/src/constants.ts diff --git a/packages/terser/src/constants.ts b/packages/terser/src/constants.ts new file mode 100644 index 000000000..36a95c890 --- /dev/null +++ b/packages/terser/src/constants.ts @@ -0,0 +1,2 @@ +export const taskInfo = Symbol('taskInfo'); +export const freeWorker = Symbol('freeWorker'); diff --git a/packages/terser/src/type.ts b/packages/terser/src/type.ts index daaf66651..0866af55f 100644 --- a/packages/terser/src/type.ts +++ b/packages/terser/src/type.ts @@ -1,5 +1,10 @@ +import type { AsyncResource } from 'async_hooks'; +import type { Worker } from 'worker_threads'; + import type { MinifyOptions } from 'terser'; +import type { taskInfo } from './constants'; + export interface Options extends MinifyOptions { nameCache?: Record; maxWorkers?: number; @@ -12,6 +17,12 @@ export interface WorkerContext { export type WorkerCallback = (err: Error | null, output?: WorkerOutput) => void; +interface WorkerPoolTaskInfo extends AsyncResource { + done(err: Error | null, result: any): void; +} + +export type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null }; + export interface WorkerContextSerialized { code: string; options: string; diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index c1c92620e..5630fd963 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -5,19 +5,17 @@ import { EventEmitter } from 'events'; import serializeJavascript from 'serialize-javascript'; +import { freeWorker, taskInfo } from './constants'; + import type { WorkerCallback, WorkerContext, WorkerOutput, WorkerPoolOptions, - WorkerPoolTask + WorkerPoolTask, + WorkerWithTaskInfo } from './type'; -const taskInfo = Symbol('taskInfo'); -const freeWorker = Symbol('freeWorker'); - -type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null }; - class WorkerPoolTaskInfo extends AsyncResource { constructor(private callback: WorkerCallback) { super('WorkerPoolTaskInfo');