From d911989b376e78ffaec04a08df743fbfe987ceb5 Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Thu, 3 Nov 2022 18:26:58 +0100 Subject: [PATCH] worker: allow retrieving elu from parent Usually, when extracting the ELU from a specific JS thread is better to do it from a different thread as the event loop we're observing might already be blocked. The `Worker.performance.eventLoopUtilization()` method allows us to do this for worker threads, but there's not a way to do this for the main thread. This new API, which allows us to retrieve the ELU of the parent thread from a specific worker, is going to enable this. For the moment, I have defined this new API in ``` require('worker_threads').parent.performance.eventLoopUtilization() ``` though I haven't added documentation yet as a) I want to know first whether this approach is acceptable, and in case it is, b) I'm not really sure whether that's the place the API should live in. Would love receiving feedback on this. --- lib/internal/main/worker_thread.js | 3 ++ lib/internal/worker.js | 46 +++++++++++++++- lib/internal/worker/io.js | 3 +- lib/worker_threads.js | 8 ++- src/node_worker.cc | 11 ++++ src/node_worker.h | 3 ++ ...worker-parent-performance-eventlooputil.js | 54 +++++++++++++++++++ 7 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-worker-parent-performance-eventlooputil.js diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index 9ae04e288fc70c..c256f10aef9573 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -29,6 +29,7 @@ const { messageTypes: { // Messages that may be received by workers LOAD_SCRIPT, + PARENT_LOOP_START, // Messages that may be posted from workers UP_AND_RUNNING, ERROR_MESSAGE, @@ -159,6 +160,8 @@ port.on('message', (message) => { const CJSLoader = require('internal/modules/cjs/loader'); CJSLoader.Module.runMain(filename); } + } else if (message.type === PARENT_LOOP_START) { + require('internal/worker').setParentEventLoopStartTime(message.value); } else if (message.type === STDIO_PAYLOAD) { const { stream, chunks } = message; ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 3cc589c996703c..151eb530c044f7 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -27,6 +27,7 @@ const { const EventEmitter = require('events'); const assert = require('internal/assert'); const path = require('path'); +const { setImmediate } = require('timers'); const { internalEventLoopUtilization } = require('internal/perf/event_loop_utilization'); @@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url'); const { kEmptyObject } = require('internal/util'); const { validateArray } = require('internal/validators'); +const { + constants: { + NODE_PERFORMANCE_MILESTONE_LOOP_START, + }, + milestones, +} = internalBinding('performance'); + const { ownsProcessState, isMainThread, @@ -70,7 +78,8 @@ const { kMaxOldGenerationSizeMb, kCodeRangeSizeMb, kStackSizeMb, - kTotalResourceLimitCount + kTotalResourceLimitCount, + parentLoopIdleTime, } = internalBinding('worker'); const kHandle = Symbol('kHandle'); @@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage'); const kParentSideStdio = Symbol('kParentSideStdio'); const kLoopStartTime = Symbol('kLoopStartTime'); const kIsOnline = Symbol('kIsOnline'); +const kSendLoopStart = Symbol('kSendLoopStart'); const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { @@ -265,6 +275,13 @@ class Worker extends EventEmitter { this[kHandle].startThread(); process.nextTick(() => process.emit('worker', this)); + // Send current thread loopStart to the worker. In case the loop has not yet + // started, send it after the poll phase of the loop has completed. + if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) { + setImmediate(() => this[kSendLoopStart]()); + } else { + this[kSendLoopStart](); + } if (workerThreadsChannel.hasSubscribers) { workerThreadsChannel.publish({ worker: this, @@ -346,6 +363,13 @@ class Worker extends EventEmitter { } } + [kSendLoopStart]() { + this[kPort]?.postMessage({ + type: messageTypes.PARENT_LOOP_START, + value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6 + }); + } + postMessage(...args) { if (this[kPublicPort] === null) return; @@ -494,6 +518,24 @@ function eventLoopUtilization(util1, util2) { ); } +let parentEventLoopStartTime = -1; +function setParentEventLoopStartTime(time) { + parentEventLoopStartTime = time; +} + +function parentEventLoopUtilization(util1, util2) { + if (parentEventLoopStartTime === -1) { + return { idle: 0, active: 0, utilization: 0 }; + } + + return internalEventLoopUtilization( + parentEventLoopStartTime, + parentLoopIdleTime(), + util1, + util2, + ); +} + module.exports = { ownsProcessState, isMainThread, @@ -505,4 +547,6 @@ module.exports = { assignEnvironmentData, threadId, Worker, + setParentEventLoopStartTime, + parentEventLoopUtilization, }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 61f9a5363716a8..9373e32e3d4739 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -89,7 +89,8 @@ const messageTypes = { ERROR_MESSAGE: 'errorMessage', STDIO_PAYLOAD: 'stdioPayload', STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', - LOAD_SCRIPT: 'loadScript' + LOAD_SCRIPT: 'loadScript', + PARENT_LOOP_START: 'parentLoopStart', }; // We have to mess with the MessagePort prototype a bit, so that a) we can make diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 9d702fa2883447..19e27dbb1444ac 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -7,7 +7,8 @@ const { setEnvironmentData, getEnvironmentData, threadId, - Worker + Worker, + parentEventLoopUtilization, } = require('internal/worker'); const { @@ -38,4 +39,9 @@ module.exports = { BroadcastChannel, setEnvironmentData, getEnvironmentData, + parent: isMainThread ? null : { + performance: { + eventLoopUtilization: parentEventLoopUtilization, + }, + }, }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 6b0ca484ace83a..ff00f74aebcfed 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -56,6 +56,7 @@ Worker::Worker(Environment* env, per_isolate_opts_(per_isolate_opts), exec_argv_(exec_argv), platform_(env->isolate_data()->platform()), + parent_loop_(env->event_loop()), thread_id_(AllocateEnvironmentThreadId()), env_vars_(env_vars), snapshot_data_(snapshot_data) { @@ -867,6 +868,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } +void Worker::ParentLoopIdleTime(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(!env->is_main_thread()); + Worker* w = env->worker_context(); + uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_); + args.GetReturnValue().Set(1.0 * idle_time / 1e6); +} + namespace { // Return the MessagePort that is global for this Environment and communicates @@ -923,6 +932,7 @@ void InitWorker(Local target, } SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort); + SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime); target ->Set(env->context(), @@ -969,6 +979,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); + registry->Register(Worker::ParentLoopIdleTime); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index dcb58d13e0e6f9..9a5defad41137f 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -75,6 +75,7 @@ class Worker : public AsyncWrap { static void TakeHeapSnapshot(const v8::FunctionCallbackInfo& args); static void LoopIdleTime(const v8::FunctionCallbackInfo& args); static void LoopStartTime(const v8::FunctionCallbackInfo& args); + static void ParentLoopIdleTime(const v8::FunctionCallbackInfo&); private: bool CreateEnvMessagePort(Environment* env); @@ -91,6 +92,8 @@ class Worker : public AsyncWrap { std::unique_ptr inspector_parent_handle_; + uv_loop_t* parent_loop_; + // This mutex protects access to all variables listed below it. mutable Mutex mutex_; diff --git a/test/parallel/test-worker-parent-performance-eventlooputil.js b/test/parallel/test-worker-parent-performance-eventlooputil.js new file mode 100644 index 00000000000000..69fe61550c5d3b --- /dev/null +++ b/test/parallel/test-worker-parent-performance-eventlooputil.js @@ -0,0 +1,54 @@ +'use strict'; + +const { mustCall } = require('../common'); + +const TIMEOUT = 10; +const SPIN_DUR = 50; + +const assert = require('assert'); +const { Worker, parent, workerData } = require('worker_threads'); + +// Do not use isMainThread directly, otherwise the test would time out in case +// it's started inside of another worker thread. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = '1'; + const i32arr = new Int32Array(new SharedArrayBuffer(4)); + const w = new Worker(__filename, { workerData: i32arr }); + w.on('online', mustCall(() => { + Atomics.wait(i32arr, 0, 0); + + const t = Date.now(); + while (Date.now() - t < SPIN_DUR); + + Atomics.store(i32arr, 0, 0); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 0); + })); +} else { + setTimeout(() => { + const { eventLoopUtilization } = parent.performance; + const i32arr = workerData; + const elu1 = eventLoopUtilization(); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 1); + + const elu2 = eventLoopUtilization(elu1); + const elu3 = eventLoopUtilization(); + const elu4 = eventLoopUtilization(elu3, elu1); + + assert.strictEqual(elu2.idle, 0); + assert.strictEqual(elu4.idle, 0); + assert.strictEqual(elu2.utilization, 1); + assert.strictEqual(elu4.utilization, 1); + assert.strictEqual(elu3.active - elu1.active, elu4.active); + assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`); + assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`); + assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`); + assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + }, TIMEOUT); +}