Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terser): Update WorkerPool to reuse Workers #1409

Merged
merged 6 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/terser/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const taskInfo = Symbol('taskInfo');
export const freeWorker = Symbol('freeWorker');
27 changes: 23 additions & 4 deletions packages/terser/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@ import { WorkerPool } from './worker-pool';
export default function terser(input: Options = {}) {
const { maxWorkers, ...options } = input;

const workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
let workerPool: WorkerPool | null | undefined;
let numOfChunks = 0;
let numOfWorkersUsed = 0;

return {
name: 'terser',

async renderChunk(code: string, chunk: RenderedChunk, outputOptions: NormalizedOutputOptions) {
if (!workerPool) {
workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
}

numOfChunks += 1;

const defaultOptions: Options = {
sourceMap: outputOptions.sourcemap === true || typeof outputOptions.sourcemap === 'string'
};
Expand Down Expand Up @@ -80,7 +88,18 @@ export default function terser(input: Options = {}) {
return result;
} catch (e) {
return Promise.reject(e);
} finally {
numOfChunks -= 1;
if (numOfChunks === 0) {
numOfWorkersUsed = workerPool.numWorkers;
workerPool.close();
workerPool = null;
dasa marked this conversation as resolved.
Show resolved Hide resolved
}
}
},

get numOfWorkersUsed() {
return numOfWorkersUsed;
}
};
}
11 changes: 11 additions & 0 deletions packages/terser/src/type.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import type { AsyncResource } from 'async_hooks';
import type { Worker } from 'worker_threads';

import type { MinifyOptions } from 'terser';

import type { taskInfo } from './constants';

export interface Options extends MinifyOptions {
nameCache?: Record<string, any>;
maxWorkers?: number;
Expand All @@ -12,6 +17,12 @@ export interface WorkerContext {

export type WorkerCallback = (err: Error | null, output?: WorkerOutput) => void;

interface WorkerPoolTaskInfo extends AsyncResource {
done(err: Error | null, result: any): void;
}

export type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null };

export interface WorkerContextSerialized {
code: string;
options: string;
Expand Down
119 changes: 64 additions & 55 deletions packages/terser/src/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
import { AsyncResource } from 'async_hooks';
import { Worker } from 'worker_threads';
import { cpus } from 'os';
import { EventEmitter } from 'events';

import serializeJavascript from 'serialize-javascript';

import { freeWorker, taskInfo } from './constants';

import type {
WorkerCallback,
WorkerContext,
WorkerOutput,
WorkerPoolOptions,
WorkerPoolTask
WorkerPoolTask,
WorkerWithTaskInfo
} from './type';

const symbol = Symbol.for('FreeWoker');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(private callback: WorkerCallback) {
super('WorkerPoolTaskInfo');
}

done(err: Error | null, result: any) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy();
}
}

export class WorkerPool extends EventEmitter {
protected maxInstances: number;
Expand All @@ -21,37 +34,30 @@ export class WorkerPool extends EventEmitter {

protected tasks: WorkerPoolTask[] = [];

protected workers = 0;
protected workers: WorkerWithTaskInfo[] = [];
protected freeWorkers: WorkerWithTaskInfo[] = [];

constructor(options: WorkerPoolOptions) {
super();

this.maxInstances = options.maxWorkers || cpus().length;
this.filePath = options.filePath;

this.on(symbol, () => {
this.on(freeWorker, () => {
if (this.tasks.length > 0) {
this.run();
const { context, cb } = this.tasks.shift()!;
this.runTask(context, cb);
}
});
}

add(context: WorkerContext, cb: WorkerCallback) {
this.tasks.push({
context,
cb
});

if (this.workers >= this.maxInstances) {
return;
}

this.run();
get numWorkers(): number {
return this.workers.length;
}

async addAsync(context: WorkerContext): Promise<WorkerOutput> {
addAsync(context: WorkerContext): Promise<WorkerOutput> {
return new Promise((resolve, reject) => {
this.add(context, (err, output) => {
this.runTask(context, (err, output) => {
if (err) {
reject(err);
return;
Expand All @@ -67,51 +73,54 @@ export class WorkerPool extends EventEmitter {
});
}

private run() {
if (this.tasks.length === 0) {
return;
}

const task = this.tasks.shift();

if (typeof task === 'undefined') {
return;
close() {
for (let i = 0; i < this.workers.length; i++) {
const worker = this.workers[i];
worker.terminate();
}
}

this.workers += 1;

let called = false;
const callCallback = (err: Error | null, output?: WorkerOutput) => {
if (called) {
return;
}
called = true;

this.workers -= 1;

task.cb(err, output);
this.emit(symbol);
};

const worker = new Worker(this.filePath, {
workerData: {
code: task.context.code,
options: serializeJavascript(task.context.options)
}
});
private addNewWorker() {
const worker: WorkerWithTaskInfo = new Worker(this.filePath);

worker.on('message', (data) => {
callCallback(null, data);
worker.on('message', (result) => {
worker[taskInfo]?.done(null, result);
worker[taskInfo] = null;
this.freeWorkers.push(worker);
this.emit(freeWorker);
});

worker.on('error', (err) => {
callCallback(err);
if (worker[taskInfo]) {
worker[taskInfo].done(err, null);
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});

worker.on('exit', (code) => {
if (code !== 0) {
callCallback(new Error(`Minify worker stopped with exit code ${code}`));
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(freeWorker);
}

private runTask(context: WorkerContext, cb: WorkerCallback) {
if (this.freeWorkers.length === 0) {
this.tasks.push({ context, cb });
if (this.numWorkers < this.maxInstances) {
this.addNewWorker();
}
});
return;
}

const worker = this.freeWorkers.pop();
if (worker) {
worker[taskInfo] = new WorkerPoolTaskInfo(cb);
worker.postMessage({
code: context.code,
options: serializeJavascript(context.options)
});
}
}
}
29 changes: 15 additions & 14 deletions packages/terser/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import process from 'process';
import { isMainThread, parentPort, workerData } from 'worker_threads';
import { isMainThread, parentPort } from 'worker_threads';

import { hasOwnProperty, isObject } from 'smob';

Expand All @@ -22,21 +21,25 @@ function isWorkerContextSerialized(input: unknown): input is WorkerContextSerial
);
}

export async function runWorker() {
if (isMainThread || !parentPort || !isWorkerContextSerialized(workerData)) {
export function runWorker() {
if (isMainThread || !parentPort) {
return;
}

try {
// eslint-disable-next-line no-eval
const eval2 = eval;
// eslint-disable-next-line no-eval
const eval2 = eval;

const options = eval2(`(${workerData.options})`);
parentPort.on('message', async (data: WorkerContextSerialized) => {
if (!isWorkerContextSerialized(data)) {
return;
}

const options = eval2(`(${data.options})`);

const result = await minify(workerData.code, options);
const result = await minify(data.code, options);

const output: WorkerOutput = {
code: result.code || workerData.code,
code: result.code || data.code,
nameCache: options.nameCache
};

Expand All @@ -48,8 +51,6 @@ export async function runWorker() {
output.sourceMap = result.map;
}

parentPort.postMessage(output);
} catch (e) {
process.exit(1);
}
parentPort?.postMessage(output);
});
}
22 changes: 19 additions & 3 deletions packages/terser/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ test.serial('minify via terser options', async (t) => {
});

test.serial('minify multiple outputs', async (t) => {
let plugin;

const bundle = await rollup({
input: 'test/fixtures/unminified.js',
plugins: [terser()]
plugins: [(plugin = terser({ maxWorkers: 2 }))]
});

const [bundle1, bundle2] = await Promise.all([
Expand All @@ -60,6 +62,20 @@ test.serial('minify multiple outputs', async (t) => {

t.is(output1.code, '"use strict";window.a=5,window.a<3&&console.log(4);\n');
t.is(output2.code, 'window.a=5,window.a<3&&console.log(4);\n');
t.is(plugin.numOfWorkersUsed, 2, 'used 2 workers');
});

test.serial('minify multiple outputs with only 1 worker', async (t) => {
let plugin;

const bundle = await rollup({
input: 'test/fixtures/unminified.js',
plugins: [(plugin = terser({ maxWorkers: 1 }))]
});

await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'es' })]);

t.is(plugin.numOfWorkersUsed, 1, 'used 1 worker');
});

test.serial('minify esm module', async (t) => {
Expand Down Expand Up @@ -122,7 +138,7 @@ test.serial('throw error on terser fail', async (t) => {
await bundle.generate({ format: 'esm' });
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand All @@ -142,7 +158,7 @@ test.serial('throw error on terser fail with multiple outputs', async (t) => {
await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'esm' })]);
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand Down