Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: allow retrieving elu from parent #45330

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const {
messageTypes: {
// Messages that may be received by workers
LOAD_SCRIPT,
PARENT_LOOP_START,
// Messages that may be posted from workers
UP_AND_RUNNING,
ERROR_MESSAGE,
Expand Down Expand Up @@ -159,6 +160,8 @@ port.on('message', (message) => {
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
}
} else if (message.type === PARENT_LOOP_START) {
require('internal/worker').setParentEventLoopStartTime(message.value);
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunks } = message;
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
Expand Down
46 changes: 45 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
const EventEmitter = require('events');
const assert = require('internal/assert');
const path = require('path');
const { setImmediate } = require('timers');
const {
internalEventLoopUtilization
} = require('internal/perf/event_loop_utilization');
Expand Down Expand Up @@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
const { validateArray } = require('internal/validators');

const {
constants: {
NODE_PERFORMANCE_MILESTONE_LOOP_START,
},
milestones,
} = internalBinding('performance');

const {
ownsProcessState,
isMainThread,
Expand All @@ -70,7 +78,8 @@ const {
kMaxOldGenerationSizeMb,
kCodeRangeSizeMb,
kStackSizeMb,
kTotalResourceLimitCount
kTotalResourceLimitCount,
parentLoopIdleTime,
} = internalBinding('worker');

const kHandle = Symbol('kHandle');
Expand All @@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kLoopStartTime = Symbol('kLoopStartTime');
const kIsOnline = Symbol('kIsOnline');
const kSendLoopStart = Symbol('kSendLoopStart');

const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
Expand Down Expand Up @@ -265,6 +275,13 @@ class Worker extends EventEmitter {
this[kHandle].startThread();

process.nextTick(() => process.emit('worker', this));
// Send current thread loopStart to the worker. In case the loop has not yet
// started, send it after the poll phase of the loop has completed.
if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) {
setImmediate(() => this[kSendLoopStart]());
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
} else {
this[kSendLoopStart]();
}
if (workerThreadsChannel.hasSubscribers) {
workerThreadsChannel.publish({
worker: this,
Expand Down Expand Up @@ -346,6 +363,13 @@ class Worker extends EventEmitter {
}
}

[kSendLoopStart]() {
this[kPort]?.postMessage({
type: messageTypes.PARENT_LOOP_START,
value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6
});
}

postMessage(...args) {
if (this[kPublicPort] === null) return;

Expand Down Expand Up @@ -494,6 +518,24 @@ function eventLoopUtilization(util1, util2) {
);
}

let parentEventLoopStartTime = -1;
function setParentEventLoopStartTime(time) {
parentEventLoopStartTime = time;
}

function parentEventLoopUtilization(util1, util2) {
if (parentEventLoopStartTime === -1) {
return { idle: 0, active: 0, utilization: 0 };
}

return internalEventLoopUtilization(
parentEventLoopStartTime,
parentLoopIdleTime(),
util1,
util2,
);
}

module.exports = {
ownsProcessState,
isMainThread,
Expand All @@ -505,4 +547,6 @@ module.exports = {
assignEnvironmentData,
threadId,
Worker,
setParentEventLoopStartTime,
parentEventLoopUtilization,
};
3 changes: 2 additions & 1 deletion lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ const messageTypes = {
ERROR_MESSAGE: 'errorMessage',
STDIO_PAYLOAD: 'stdioPayload',
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
LOAD_SCRIPT: 'loadScript'
LOAD_SCRIPT: 'loadScript',
PARENT_LOOP_START: 'parentLoopStart',
};

// We have to mess with the MessagePort prototype a bit, so that a) we can make
Expand Down
8 changes: 7 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
setEnvironmentData,
getEnvironmentData,
threadId,
Worker
Worker,
parentEventLoopUtilization,
} = require('internal/worker');

const {
Expand Down Expand Up @@ -38,4 +39,9 @@ module.exports = {
BroadcastChannel,
setEnvironmentData,
getEnvironmentData,
parent: isMainThread ? null : {
performance: {
eventLoopUtilization: parentEventLoopUtilization,
},
},
};
11 changes: 11 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Worker::Worker(Environment* env,
per_isolate_opts_(per_isolate_opts),
exec_argv_(exec_argv),
platform_(env->isolate_data()->platform()),
parent_loop_(env->event_loop()),
thread_id_(AllocateEnvironmentThreadId()),
env_vars_(env_vars),
snapshot_data_(snapshot_data) {
Expand Down Expand Up @@ -867,6 +868,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(loop_start_time / 1e6);
}

void Worker::ParentLoopIdleTime(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(!env->is_main_thread());
Worker* w = env->worker_context();
uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_);
args.GetReturnValue().Set(1.0 * idle_time / 1e6);
}

namespace {

// Return the MessagePort that is global for this Environment and communicates
Expand Down Expand Up @@ -923,6 +932,7 @@ void InitWorker(Local<Object> target,
}

SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime);

target
->Set(env->context(),
Expand Down Expand Up @@ -969,6 +979,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::ParentLoopIdleTime);
}

} // anonymous namespace
Expand Down
3 changes: 3 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Worker : public AsyncWrap {
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ParentLoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>&);

private:
bool CreateEnvMessagePort(Environment* env);
Expand All @@ -91,6 +92,8 @@ class Worker : public AsyncWrap {

std::unique_ptr<InspectorParentHandle> inspector_parent_handle_;

uv_loop_t* parent_loop_;

// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;

Expand Down
54 changes: 54 additions & 0 deletions test/parallel/test-worker-parent-performance-eventlooputil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const { mustCall } = require('../common');

const TIMEOUT = 10;
const SPIN_DUR = 50;

const assert = require('assert');
const { Worker, parent, workerData } = require('worker_threads');

// Do not use isMainThread directly, otherwise the test would time out in case
// it's started inside of another worker thread.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = '1';
const i32arr = new Int32Array(new SharedArrayBuffer(4));
const w = new Worker(__filename, { workerData: i32arr });
w.on('online', mustCall(() => {
Atomics.wait(i32arr, 0, 0);

const t = Date.now();
while (Date.now() - t < SPIN_DUR);

Atomics.store(i32arr, 0, 0);
Atomics.notify(i32arr, 0);
Atomics.wait(i32arr, 0, 0);
}));
} else {
setTimeout(() => {
const { eventLoopUtilization } = parent.performance;
const i32arr = workerData;
const elu1 = eventLoopUtilization();

Atomics.store(i32arr, 0, 1);
Atomics.notify(i32arr, 0);
Atomics.wait(i32arr, 0, 1);

const elu2 = eventLoopUtilization(elu1);
const elu3 = eventLoopUtilization();
const elu4 = eventLoopUtilization(elu3, elu1);

assert.strictEqual(elu2.idle, 0);
assert.strictEqual(elu4.idle, 0);
assert.strictEqual(elu2.utilization, 1);
assert.strictEqual(elu4.utilization, 1);
assert.strictEqual(elu3.active - elu1.active, elu4.active);
assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`);
assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`);
assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`);
assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`);

Atomics.store(i32arr, 0, 1);
Atomics.notify(i32arr, 0);
}, TIMEOUT);
}