Skip to content

Commit

Permalink
worker: allow retrieving elu from parent
Browse files Browse the repository at this point in the history
Usually, when extracting the ELU from a specific JS thread is better to
do it from a different thread as the event loop we're observing might
already be blocked. The `Worker.performance.eventLoopUtilization()`
method allows us to do this for worker threads, but there's not a way
to do this for the main thread. This new API, which allows us to
retrieve the ELU of the parent thread from a specific worker, is going
to enable this.

For the moment, I have defined this new API in
```
require('worker_threads').parent.performance.eventLoopUtilization()
```
though I haven't added documentation yet as
a) I want to know first whether this approach is acceptable, and in case
it is,
b) I'm not really sure whether that's the place the API should live in.
Would love receiving feedback on this.
  • Loading branch information
santigimeno committed Nov 6, 2022
1 parent 28bf031 commit c47419e
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 3 deletions.
3 changes: 3 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const {
messageTypes: {
// Messages that may be received by workers
LOAD_SCRIPT,
PARENT_LOOP_START,
// Messages that may be posted from workers
UP_AND_RUNNING,
ERROR_MESSAGE,
Expand Down Expand Up @@ -159,6 +160,8 @@ port.on('message', (message) => {
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
}
} else if (message.type === PARENT_LOOP_START) {
require('internal/worker').setParentEventLoopStartTime(message.value);
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunks } = message;
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
Expand Down
46 changes: 45 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
const EventEmitter = require('events');
const assert = require('internal/assert');
const path = require('path');
const { setImmediate } = require('timers');
const {
internalEventLoopUtilization
} = require('internal/perf/event_loop_utilization');
Expand Down Expand Up @@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
const { validateArray } = require('internal/validators');

const {
constants: {
NODE_PERFORMANCE_MILESTONE_LOOP_START,
},
milestones,
} = internalBinding('performance');

const {
ownsProcessState,
isMainThread,
Expand All @@ -70,7 +78,8 @@ const {
kMaxOldGenerationSizeMb,
kCodeRangeSizeMb,
kStackSizeMb,
kTotalResourceLimitCount
kTotalResourceLimitCount,
parentLoopIdleTime,
} = internalBinding('worker');

const kHandle = Symbol('kHandle');
Expand All @@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kLoopStartTime = Symbol('kLoopStartTime');
const kIsOnline = Symbol('kIsOnline');
const kSendLoopStart = Symbol('kSendLoopStart');

const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
Expand Down Expand Up @@ -265,6 +275,13 @@ class Worker extends EventEmitter {
this[kHandle].startThread();

process.nextTick(() => process.emit('worker', this));
// Send current thread loopStart to the worker. In case the loop has not yet
// started, send it after the poll phase of the loop has completed.
if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) {
setImmediate(() => this[kSendLoopStart]());
} else {
this[kSendLoopStart]();
}
if (workerThreadsChannel.hasSubscribers) {
workerThreadsChannel.publish({
worker: this,
Expand Down Expand Up @@ -346,6 +363,13 @@ class Worker extends EventEmitter {
}
}

[kSendLoopStart]() {
this[kPort]?.postMessage({
type: messageTypes.PARENT_LOOP_START,
value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6
});
}

postMessage(...args) {
if (this[kPublicPort] === null) return;

Expand Down Expand Up @@ -490,6 +514,24 @@ function eventLoopUtilization(util1, util2) {
);
}

let parentEventLoopStartTime = -1;
function setParentEventLoopStartTime(time) {
parentEventLoopStartTime = time;
}

function parentEventLoopUtilization(util1, util2) {
if (parentEventLoopStartTime === -1) {
return { idle: 0, active: 0, utilization: 0 };
}

return internalEventLoopUtilization(
parentEventLoopStartTime,
parentLoopIdleTime(),
util1,
util2
);
}

module.exports = {
ownsProcessState,
isMainThread,
Expand All @@ -501,4 +543,6 @@ module.exports = {
assignEnvironmentData,
threadId,
Worker,
setParentEventLoopStartTime,
parentEventLoopUtilization,
};
3 changes: 2 additions & 1 deletion lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ const messageTypes = {
ERROR_MESSAGE: 'errorMessage',
STDIO_PAYLOAD: 'stdioPayload',
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
LOAD_SCRIPT: 'loadScript'
LOAD_SCRIPT: 'loadScript',
PARENT_LOOP_START: 'parentLoopStart',
};

// We have to mess with the MessagePort prototype a bit, so that a) we can make
Expand Down
8 changes: 7 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
setEnvironmentData,
getEnvironmentData,
threadId,
Worker
Worker,
parentEventLoopUtilization,
} = require('internal/worker');

const {
Expand Down Expand Up @@ -38,4 +39,9 @@ module.exports = {
BroadcastChannel,
setEnvironmentData,
getEnvironmentData,
parent: isMainThread ? null : {
performance: {
eventLoopUtilization: parentEventLoopUtilization,
}
},
};
11 changes: 11 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Worker::Worker(Environment* env,
per_isolate_opts_(per_isolate_opts),
exec_argv_(exec_argv),
platform_(env->isolate_data()->platform()),
parent_loop_(env->event_loop()),
thread_id_(AllocateEnvironmentThreadId()),
env_vars_(env_vars),
snapshot_data_(snapshot_data) {
Expand Down Expand Up @@ -865,6 +866,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(loop_start_time / 1e6);
}

void Worker::ParentLoopIdleTime(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(!env->is_main_thread());
Worker* w = env->worker_context();
uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_);
args.GetReturnValue().Set(1.0 * idle_time / 1e6);
}

namespace {

// Return the MessagePort that is global for this Environment and communicates
Expand Down Expand Up @@ -921,6 +930,7 @@ void InitWorker(Local<Object> target,
}

SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime);

target
->Set(env->context(),
Expand Down Expand Up @@ -967,6 +977,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::ParentLoopIdleTime);
}

} // anonymous namespace
Expand Down
3 changes: 3 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Worker : public AsyncWrap {
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ParentLoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>&);

private:
bool CreateEnvMessagePort(Environment* env);
Expand All @@ -91,6 +92,8 @@ class Worker : public AsyncWrap {

std::unique_ptr<InspectorParentHandle> inspector_parent_handle_;

uv_loop_t* parent_loop_;

// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;

Expand Down
54 changes: 54 additions & 0 deletions test/parallel/test-worker-parent-performance-eventlooputil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const { mustCall } = require('../common');

const TIMEOUT = 10;
const SPIN_DUR = 50;

const assert = require('assert');
const { Worker, parent, workerData } = require('worker_threads');

// Do not use isMainThread directly, otherwise the test would time out in case
// it's started inside of another worker thread.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = '1';
const i32arr = new Int32Array(new SharedArrayBuffer(4));
const w = new Worker(__filename, { workerData: i32arr });
w.on('online', mustCall(() => {
Atomics.wait(i32arr, 0, 0);

const t = Date.now();
while (Date.now() - t < SPIN_DUR);

Atomics.store(i32arr, 0, 0);
Atomics.notify(i32arr, 0);
Atomics.wait(i32arr, 0, 0);
}));
} else {
setTimeout(() => {
const { eventLoopUtilization } = parent.performance;
const i32arr = workerData;
const elu1 = eventLoopUtilization();

Atomics.store(i32arr, 0, 1);
Atomics.notify(i32arr, 0);
Atomics.wait(i32arr, 0, 1);

const elu2 = eventLoopUtilization(elu1);
const elu3 = eventLoopUtilization();
const elu4 = eventLoopUtilization(elu3, elu1);

assert.strictEqual(elu2.idle, 0);
assert.strictEqual(elu4.idle, 0);
assert.strictEqual(elu2.utilization, 1);
assert.strictEqual(elu4.utilization, 1);
assert.strictEqual(elu3.active - elu1.active, elu4.active);
assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`);
assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`);
assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`);
assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`);

Atomics.store(i32arr, 0, 1);
Atomics.notify(i32arr, 0);
}, TIMEOUT);
}

0 comments on commit c47419e

Please sign in to comment.