diff --git a/LICENSE b/LICENSE index a2a5437f5d007f..a3b78c514e524c 100644 --- a/LICENSE +++ b/LICENSE @@ -984,8 +984,7 @@ The externally maintained libraries used by Node.js are: - Strongtalk assembler, the basis of the files assembler-arm-inl.h, assembler-arm.cc, assembler-arm.h, assembler-ia32-inl.h, assembler-ia32.cc, assembler-ia32.h, assembler-x64-inl.h, - assembler-x64.cc, assembler-x64.h, assembler-mips-inl.h, - assembler-mips.cc, assembler-mips.h, assembler.cc and assembler.h. + assembler-x64.cc, assembler-x64.h, assembler.cc and assembler.h. This code is copyrighted by Sun Microsystems Inc. and released under a 3-clause BSD license. @@ -1871,3 +1870,28 @@ The externally maintained libraries used by Node.js are: NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ + +- synchronous-worker, located at lib/worker_threads.js, is licensed as follows: + """ + The MIT License (MIT) + + Copyright (c) 2020 Anna Henningsen + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + """ diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 99772189522e84..85edc52b6a465f 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -1329,6 +1329,228 @@ from the running process and will preload the same preload scripts as the main thread. If the preload script unconditionally launches a worker thread, every thread spawned will spawn another until the application crashes. +## Synchronous Workers + +> Stability: 1 - Experimental + +### Class: `SynchronousWorker` + + + +* Extends: {EventEmitter} + +A `SynchronousWorker` is effectively a Node.js environment that runs within the +same thread. + +```cjs +const { SynchronousWorker } = require('node:worker_threads'); +const w = new SynchronousWorker(); +const myAsyncFunction = w.createRequire(__filename)('my-module'); +const response = w.runLoopUntilPromiseResolved(myAsyncFunction('http://example.org')); +const text = w.runLoopUntilPromiseResolved(response.text()); +console.log(text); +``` + +#### `new SynchronousWorker([options])` + + + +* `options` {Object} + * `sharedEventLoop` {boolean} When `true`, use the same event loop as the + outer Node.js instance. If this is passed, the `runLoop()` and + `runLoopUntilPromiseResolved()` methods become unavailable. + **Default:** `false`. + * `sharedMicrotaskQueue` {boolean} When true, use the same microtask queue as + the outer Node.js instance. This is used for resolving promises created in + the inner context, including those implicitly generated by `async/await`. + If this is passed, the `runLoopUntilPromiseResolved()` method becomes + unavailable. **Default:** `false`. + +While setting `sharedEventLoop` to `false` and `sharedMicrotaskQueue` to `true` +is accepted, they typically do not make sense together. + +#### `synchronousWorker.runLoop([mode])` + + + +* `mode` {string} One of either `'default'`, `'once'`, or `'nowait'`. + +Spin the event loop of the inner Node.js instance. `mode` can be either +`default`, `once` or `nowait`. See the [libuv documentation for `uv_run()`][] +for details on these modes. + +#### `synchronousWorker.runLoopUntilPromiseResolved(promise)` + + + +* `promise` {Promise} + +Spin the event loop of the innsert Node.js instance until a specific `Promise` +is resolved. + +#### `synchronousWorker.runInWorkerScope(fn)` + + + +* `fn` {Function} + +Wrap `fn` and run it as if it were run on the event loop of the inner Node.js +instance. In particular, this ensures that Promises created by the function +itself are resolved correctly. You should generally use this to run any code +inside the innert Node.js instance that performs asynchronous activity and that +is not already running in an asynchronous context (you can compare this to +the code that runs synchronously from the main file of a Node.js application). + +#### `synchronousWorker.loopAlive` + + + +* Type: {boolean} + +This is a read-only boolean property indicating whether there are currently any +items on the event loop of the inner Node.js instance. + +#### `synchronousWorker.stop()` + + + +Interrupt any execution of code inside the inner Node.js instance, i.e. +return directly from a `.runLoop()`, `.runLoopUntilPromiseResolved()` or +`.runInWorkerScope()` call. This will render the Node.js instance unusable +and is generally comparable to running `process.exit()`. + +This method returns a `Promise` that will be resolved when all resources +associated with this Node.js instance are released. This `Promise` resolves on +the event loop of the _outer_ Node.js instance. + +#### `synchronousWorker.createRequire(filename)` + + + +* `filename` {string} + +Create a `require()` function that can be used for loading code inside the +inner Node.js instance. + +#### `synchronousWorker.globalThis` + + + +* Type: {Object} + +Returns a reference to the global object of the inner Node.js instance. + +#### `synchronousWorker.process` + + + +* Type: {Object} + +Returns a reference to the `process` object of the inner Node.js instance. + +### FAQ + +#### What does a SynchronousWorker do? + +Creates a new Node.js instance, using the same thread and the same JS heap. +You can create Node.js API objects, like network sockets, inside the new +Node.js instance, and spin the underlying event loop manually. + +#### Where did SynchronousWorker come from? + +`SynchronousWorker` was originally developer by Node.js core contributor +Anna Henningsen and published as a separate module [`synchronous-worker`][] on +npm under the MIT license. It was integrated into Node.js core with Anna's +permission. The majority of the code, documentation, and tests were adopted +almost verbatim from the original module. + +#### Why would I use a SynchronousWorker? + +The most common use case is probably running asynchronous code synchronously, +in situations where doing so cannot be avoided (even though one should try +really hard to avoid it). Another popular npm package that does this is +[`deasync`][], but `deasync` + +* solves this problem by starting the event loop while it is already running + (which is explicitly _not_ supported by libuv and may lead to crashes) +* doesn’t allow specifying _which_ resources or callbacks should be waited for, + and instead allows everything inside the current thread to progress. + +#### How can I avoid using SynchronousWorker? + +If you do not need to directly interact with the objects inside the inner +Node.js instance, a lot of the time Worker threads together with +[`Atomics.wait()`][] will give you what you need. + +#### My async functions/Promises/… don’t work + +If you run a `SynchronousWorker` with its own microtask queue (i.e. in default +mode), code like this will not work as expected: + +```cjs +const { SynchronousWorker } = require('node:worker_threads'); +const w = new SynchronousWorker(); +let promise; +w.runInWorkerScope(() => { + promise = (async () => { + return w.createRequire(__filename)('node-fetch')('...'); + })(); +}); +w.runLoopUntilPromiseResolved(promise); +``` + +The reason for this is that `async` functions (and Promise `.then()` handlers) +add their microtasks to the microtask queue for the Context in which the +async function (or `.then()` callback) was defined, and not the Context in which +the original `Promise` was created. Put in other words, it is possible for a +`Promise` chain to be run on different microtask queues. + +While this behavior may be counterintuitive, it is what the V8 engine does, +and is not under the control of Node.js. + +What this means is that you will need to make sure that the functions are +compiled in the Context in which they are supposed to be run; the two main +ways to achieve that are to: + +* Put them in a separate file that is loaded through `w.createRequire()` +* Use `w.createRequire(__filename)('vm').runInThisContext()` to manually compile + the code for the function in the Context of the target Node.js instance. + +For example: + +```cjs +const { SynchronousWorker } = require('node:worker_threads'); +const w = new SynchronousWorker(); +const req = w.createRequire(__filename); +let promise; +w.runInWorkerScope(() => { + promise = req('vm').runInThisContext(`(async(req) => { + return await req('node-fetch')('...'); + })`)(req); +}); +w.runLoopUntilPromiseResolved(promise); +``` + [Addons worker support]: addons.md#worker-support [ECMAScript module loader]: esm.md#data-imports [HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm @@ -1341,6 +1563,7 @@ thread spawned will spawn another until the application crashes. [`--max-semi-space-size`]: cli.md#--max-semi-space-sizesize-in-megabytes [`ArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer [`AsyncResource`]: async_hooks.md#class-asyncresource +[`Atomics.wait()`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/wait [`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize [`Buffer`]: buffer.md [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list @@ -1354,6 +1577,7 @@ thread spawned will spawn another until the application crashes. [`Worker constructor options`]: #new-workerfilename-options [`Worker`]: #class-worker [`data:` URL]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/Data_URIs +[`deasync`]: https://www.npmjs.com/package/deasync [`fs.close()`]: fs.md#fsclosefd-callback [`fs.open()`]: fs.md#fsopenpath-flags-mode-callback [`markAsUntransferable()`]: #workermarkasuntransferableobject @@ -1378,6 +1602,7 @@ thread spawned will spawn another until the application crashes. [`require('node:worker_threads').parentPort`]: #workerparentport [`require('node:worker_threads').threadId`]: #workerthreadid [`require('node:worker_threads').workerData`]: #workerworkerdata +[`synchronous-worker`]: https://github.com/addaleax/synchronous-worker [`trace_events`]: tracing.md [`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshot [`vm`]: vm.md @@ -1390,4 +1615,5 @@ thread spawned will spawn another until the application crashes. [browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort [child processes]: child_process.md [contextified]: vm.md#what-does-it-mean-to-contextify-an-object +[libuv documentation for `uv_run()`]: http://docs.libuv.org/en/v1.x/loop.html#c.uv_run [v8.serdes]: v8.md#serialization-api diff --git a/lib/internal/worker/synchronous.js b/lib/internal/worker/synchronous.js new file mode 100644 index 00000000000000..596e719b9f0c1a --- /dev/null +++ b/lib/internal/worker/synchronous.js @@ -0,0 +1,168 @@ +'use strict'; + +// SynchronousWorker was originally a separate module developed by +// Anna Henningsen and published separately on npm as the +// synchronous-worker module under the MIT license. It has been +// incorporated into Node.js with Anna's permission. +// See the LICENSE file for LICENSE and copyright attribution. + +const { + Promise, +} = primordials; + +const { + SynchronousWorker: SynchronousWorkerImpl, + UV_RUN_DEFAULT, + UV_RUN_ONCE, + UV_RUN_NOWAIT, +} = internalBinding('worker'); + +const { setImmediate } = require('timers'); + +const EventEmitter = require('events'); + +const { + codes: { + ERR_INVALID_STATE, + }, +} = require('internal/errors'); + +class SynchronousWorker extends EventEmitter { + #handle = undefined; + #process = undefined; + #global = undefined; + #module = undefined; + #hasOwnEventLoop = false; + #hasOwnMicrotaskQueue = false; + #promiseInspector = undefined; + #stoppedPromise = undefined; + + /** + * @typedef {{ + * sharedEventLoop?: boolean, + * sharedMicrotaskQueue?: boolean, + * }} SynchronousWorkerOptions + * @param {SynchronousWorkerOptions} [options] + */ + constructor(options) { + super(); + this.#hasOwnEventLoop = !(options?.sharedEventLoop); + this.#hasOwnMicrotaskQueue = !(options?.sharedMicrotaskQueue); + + this.#handle = new SynchronousWorkerImpl(); + this.#handle.onexit = (code) => { + this.stop(); + this.emit('exit', code); + }; + try { + this.#handle.start(this.#hasOwnEventLoop, this.#hasOwnMicrotaskQueue); + this.#handle.load((process, nativeRequire, globalThis) => { + const origExit = process.reallyExit; + process.reallyExit = (...args) => { + const ret = origExit.call(process, ...args); + // Make a dummy call to make sure the termination exception is + // propagated. For some reason, this isn't necessarily the case + // otherwise. + process.memoryUsage(); + return ret; + }; + this.#process = process; + this.#module = nativeRequire('module'); + this.#global = globalThis; + process.on('uncaughtException', (err) => { + if (process.listenerCount('uncaughtException') === 1) { + this.emit('error', err); + process.exit(1); + } + }); + }); + } catch (err) { + this.#handle.stop(); + throw err; + } + } + + /** + * @param {'default'|'once'|'nowait'} [mode] = 'default' + */ + runLoop(mode = 'default') { + if (!this.#hasOwnEventLoop) { + throw new ERR_INVALID_STATE('Can only use .runLoop() when using a separate event loop'); + } + let uvMode = UV_RUN_DEFAULT; + if (mode === 'once') uvMode = UV_RUN_ONCE; + if (mode === 'nowait') uvMode = UV_RUN_NOWAIT; + this.#handle.runLoop(uvMode); + } + + /** + * @param {Promise} promise + */ + runLoopUntilPromiseResolved(promise) { + if (!this.#hasOwnEventLoop || !this.#hasOwnMicrotaskQueue) { + throw new ERR_INVALID_STATE( + 'Can only use .runLoopUntilPromiseResolved() when using a separate event loop ' + + 'and microtask queue'); + } + this.#promiseInspector ??= this.createRequire('/worker.js')('vm').runInThisContext( + `(promise => { + const obj = { state: 'pending', value: null }; + promise.then((v) => { obj.state = 'fulfilled'; obj.value = v; }, + (v) => { obj.state = 'rejected'; obj.value = v; }); + return obj; + })`); + const inspected = this.#promiseInspector(promise); + this.runInWorkerScope(() => {}); // Flush the microtask queue + while (inspected.state === 'pending') { + this.runLoop('once'); + } + if (inspected.state === 'rejected') { + throw inspected.value; + } + return inspected.value; + } + + /** + * @type {boolean} + */ + get loopAlive() { + if (!this.#hasOwnEventLoop) { + throw new ERR_INVALID_STATE('Can only use .loopAlive when using a separate event loop'); + } + return this.#handle.isLoopAlive(); + } + + /** + * @returns {Promise} + */ + async stop() { + return this.#stoppedPromise ??= new Promise((resolve) => { + this.#handle.signalStop(); + setImmediate(() => { + this.#handle.stop(); + resolve(); + }); + }); + } + + get process() { + return this.#process; + } + + get globalThis() { + return this.#global; + } + + createRequire(...args) { + return this.#module.createRequire(...args); + } + + /** + * @param {() => any} method + */ + runInWorkerScope(method) { + return this.#handle.runInCallbackScope(method); + } +} + +module.exports = SynchronousWorker; diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 9d702fa2883447..33cc580db4d7c9 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -22,6 +22,8 @@ const { markAsUntransferable, } = require('internal/buffer'); +const SynchronousWorker = require('internal/worker/synchronous'); + module.exports = { isMainThread, MessagePort, @@ -38,4 +40,5 @@ module.exports = { BroadcastChannel, setEnvironmentData, getEnvironmentData, + SynchronousWorker, }; diff --git a/src/env_properties.h b/src/env_properties.h index 76c52f1ea57385..afb4f8c0953144 100644 --- a/src/env_properties.h +++ b/src/env_properties.h @@ -357,6 +357,7 @@ V(shutdown_wrap_template, v8::ObjectTemplate) \ V(socketaddress_constructor_template, v8::FunctionTemplate) \ V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \ + V(synchronousworker_constructor_template, v8::FunctionTemplate) \ V(qlogoutputstream_constructor_template, v8::ObjectTemplate) \ V(tcp_constructor_template, v8::FunctionTemplate) \ V(tty_constructor_template, v8::FunctionTemplate) \ diff --git a/src/node_errors.h b/src/node_errors.h index 68a95835812e50..8fccdef5e18a3b 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -63,6 +63,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details); V(ERR_EXECUTION_ENVIRONMENT_NOT_AVAILABLE, Error) \ V(ERR_INVALID_ADDRESS, Error) \ V(ERR_INVALID_ARG_VALUE, TypeError) \ + V(ERR_INVALID_STATE, Error) \ V(ERR_OSSL_EVP_INVALID_DIGEST, Error) \ V(ERR_INVALID_ARG_TYPE, TypeError) \ V(ERR_INVALID_OBJECT_DEFINE_PROPERTY, TypeError) \ @@ -154,6 +155,7 @@ ERRORS_WITH_CODE(V) "Context not associated with Node.js environment") \ V(ERR_INVALID_ADDRESS, "Invalid socket address") \ V(ERR_INVALID_MODULE, "No such module") \ + V(ERR_INVALID_STATE, "Invalid state") \ V(ERR_INVALID_THIS, "Value of \"this\" is the wrong type") \ V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \ V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ diff --git a/src/node_worker.cc b/src/node_worker.cc index 378fa3d16c4b45..9c5c0b91f13e54 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -1,7 +1,9 @@ #include "node_worker.h" +#include "base_object.h" #include "debug_utils-inl.h" #include "histogram-inl.h" #include "memory_tracker-inl.h" +#include "node.h" #include "node_errors.h" #include "node_external_reference.h" #include "node_buffer.h" @@ -10,6 +12,9 @@ #include "node_snapshot_builder.h" #include "util-inl.h" #include "async_wrap-inl.h" +#include "v8-local-handle.h" +#include "v8-microtask-queue.h" +#include "v8-snapshot.h" #include #include @@ -21,9 +26,13 @@ using v8::Array; using v8::ArrayBuffer; using v8::Boolean; using v8::Context; +using v8::DeserializeInternalFieldsCallback; +using v8::EscapableHandleScope; using v8::Float64Array; +using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; +using v8::Global; using v8::HandleScope; using v8::Integer; using v8::Isolate; @@ -31,9 +40,11 @@ using v8::Local; using v8::Locker; using v8::Maybe; using v8::MaybeLocal; +using v8::MicrotaskQueue; using v8::Null; using v8::Number; using v8::Object; +using v8::ObjectTemplate; using v8::ResourceConstraints; using v8::SealHandleScope; using v8::String; @@ -866,6 +877,437 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } +// ============================================================================ + +class SynchronousWorker final : public MemoryRetainer { + public: + static bool HasInstance(Environment* env, v8::Local value); + static v8::Local GetConstructorTemplate( + Environment* env); + static void Initialize( + Environment* env, + v8::Local target); + static void RegisterExternalReferences( + ExternalReferenceRegistry* registry); + + SynchronousWorker(Environment* env, v8::Local obj); + + static void New(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Load(const v8::FunctionCallbackInfo& args); + static void RunLoop(const v8::FunctionCallbackInfo& args); + static void IsLoopAlive(const v8::FunctionCallbackInfo& args); + static void SignalStop(const v8::FunctionCallbackInfo& args); + static void Stop(const v8::FunctionCallbackInfo& args); + static void RunInCallbackScope( + const v8::FunctionCallbackInfo& args); + + struct SynchronousWorkerScope : public v8::EscapableHandleScope, + public v8::Context::Scope, + public v8::Isolate::SafeForTerminationScope { + public: + explicit SynchronousWorkerScope(SynchronousWorker* w); + ~SynchronousWorkerScope(); + + private: + SynchronousWorker* w_; + bool orig_can_be_terminated_; + }; + + v8::Local context() const; + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(SynchronousWorker) + SET_SELF_SIZE(SynchronousWorker) + + private: + static SynchronousWorker* Unwrap(const FunctionCallbackInfo& arg); + static void CleanupHook(void* arg); + void OnExit(int code); + + void Start(bool own_loop, bool own_microtaskqueue); + v8::MaybeLocal Load(v8::Local callback); + v8::MaybeLocal RunInCallbackScope( + v8::Local callback); + void RunLoop(uv_run_mode mode); + bool IsLoopAlive(); + void SignalStop(); + void Stop(bool may_throw); + + Isolate* isolate_; + Global wrap_; + + uv_loop_t loop_; + std::unique_ptr microtask_queue_; + v8::Global outer_context_; + v8::Global context_; + IsolateData* isolate_data_ = nullptr; + Environment* env_ = nullptr; + bool signaled_stop_ = false; + bool can_be_terminated_ = false; + bool loop_is_running_ = false; +}; + +bool SynchronousWorker::HasInstance(Environment* env, Local value) { + return GetConstructorTemplate(env)->HasInstance(value); +} + +Local SynchronousWorker::GetConstructorTemplate( + Environment* env) { + Local tmpl = env->synchronousworker_constructor_template(); + if (tmpl.IsEmpty()) { + Isolate* isolate = env->isolate(); + tmpl = NewFunctionTemplate(isolate, New); + tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); + tmpl->InstanceTemplate()->SetInternalFieldCount(1); + + SetProtoMethod(isolate, tmpl, "start", Start); + SetProtoMethod(isolate, tmpl, "load", Load); + SetProtoMethod(isolate, tmpl, "stop", Stop); + SetProtoMethod(isolate, tmpl, "signalStop", SignalStop); + SetProtoMethod(isolate, tmpl, "runLoop", RunLoop); + SetProtoMethod(isolate, tmpl, "isLoopAlive", IsLoopAlive); + SetProtoMethod(isolate, tmpl, "runInCallbackScope", RunInCallbackScope); + + env->set_synchronousworker_constructor_template(tmpl); + } + return tmpl; +} + +void SynchronousWorker::Initialize( + Environment* env, + v8::Local target) { + SetConstructorFunction( + env->context(), + target, + "SynchronousWorker", + GetConstructorTemplate(env), + SetConstructorFunctionFlag::NONE); + + NODE_DEFINE_CONSTANT(target, UV_RUN_DEFAULT); + NODE_DEFINE_CONSTANT(target, UV_RUN_ONCE); + NODE_DEFINE_CONSTANT(target, UV_RUN_NOWAIT); +} + +void SynchronousWorker::RegisterExternalReferences( + ExternalReferenceRegistry* registry) { + registry->Register(New); + registry->Register(Start); + registry->Register(Load); + registry->Register(RunLoop); + registry->Register(IsLoopAlive); + registry->Register(SignalStop); + registry->Register(Stop); + registry->Register(RunInCallbackScope); +} + +SynchronousWorker::SynchronousWorkerScope::SynchronousWorkerScope( + SynchronousWorker* w) + : EscapableHandleScope(w->isolate_), + Scope(w->context()), + Isolate::SafeForTerminationScope(w->isolate_), + w_(w), + orig_can_be_terminated_(w->can_be_terminated_) { + w_->can_be_terminated_ = true; +} + +SynchronousWorker::SynchronousWorkerScope::~SynchronousWorkerScope() { + w_->can_be_terminated_ = orig_can_be_terminated_; +} + +Local SynchronousWorker::context() const { + return context_.Get(isolate_); +} + +SynchronousWorker::SynchronousWorker(Environment* env, Local object) + : isolate_(env->isolate()), wrap_(env->isolate(), object) { + AddEnvironmentCleanupHook(env->isolate(), CleanupHook, this); + loop_.data = nullptr; + object->SetAlignedPointerInInternalField(0, this); + + Local outer_context = env->context(); + outer_context_.Reset(env->isolate(), outer_context); +} + +SynchronousWorker* SynchronousWorker::Unwrap( + const FunctionCallbackInfo& args) { + Local value = args.This(); + if (!value->IsObject() || value.As()->InternalFieldCount() < 1) { + THROW_ERR_INVALID_THIS(Environment::GetCurrent(args.GetIsolate())); + return nullptr; + } + return static_cast( + value.As()->GetAlignedPointerFromInternalField(0)); +} + +void SynchronousWorker::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + new SynchronousWorker(env, args.This()); +} + +void SynchronousWorker::Start(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + self->Start( + args[0]->BooleanValue(args.GetIsolate()), + args[1]->BooleanValue(args.GetIsolate())); +} + +void SynchronousWorker::Stop(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + self->Stop(true); +} + +void SynchronousWorker::SignalStop(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + self->SignalStop(); + args.GetIsolate()->CancelTerminateExecution(); +} + +void SynchronousWorker::Load(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + if (!args[0]->IsFunction()) { + return THROW_ERR_INVALID_ARG_TYPE( + Environment::GetCurrent(args), + "The load() argument must be a function."); + } + Local result; + if (self->Load(args[0].As()).ToLocal(&result)) { + args.GetReturnValue().Set(result); + } +} + +void SynchronousWorker::RunLoop(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + int64_t mode; + if (!args[0]->IntegerValue(args.GetIsolate()->GetCurrentContext()).To(&mode)) + return; + self->RunLoop(static_cast(mode)); +} + +void SynchronousWorker::IsLoopAlive(const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + args.GetReturnValue().Set(self->IsLoopAlive()); +} + +void SynchronousWorker::RunInCallbackScope( + const FunctionCallbackInfo& args) { + SynchronousWorker* self = Unwrap(args); + if (self == nullptr) return; + if (!args[0]->IsFunction()) { + return THROW_ERR_INVALID_ARG_TYPE( + Environment::GetCurrent(args), + "The runInCallbackScope() argument must be a function"); + } + Local result; + if (self->RunInCallbackScope(args[0].As()).ToLocal(&result)) { + args.GetReturnValue().Set(result); + } +} + +MaybeLocal SynchronousWorker::RunInCallbackScope(Local fn) { + if (context_.IsEmpty() || signaled_stop_) { + Environment* env = Environment::GetCurrent(isolate_); + THROW_ERR_INVALID_STATE(env, "Worker has been stopped"); + return MaybeLocal(); + } + SynchronousWorkerScope worker_scope(this); + v8::Isolate* isolate = isolate_; + CallbackScope callback_scope(isolate, wrap_.Get(isolate_), { 1, 0 }); + MaybeLocal ret = fn->Call(context(), Null(isolate), 0, nullptr); + if (signaled_stop_) { + isolate->CancelTerminateExecution(); + } + return worker_scope.EscapeMaybe(ret); +} + +void SynchronousWorker::Start(bool own_loop, bool own_microtaskqueue) { + signaled_stop_ = false; + Local outer_context = outer_context_.Get(isolate_); + Environment* outer_env = GetCurrentEnvironment(outer_context); + assert(outer_env != nullptr); + uv_loop_t* outer_loop = GetCurrentEventLoop(isolate_); + assert(outer_loop != nullptr); + USE(outer_loop); // Exists only to silence a compiler warning. + + if (own_loop) { + int ret = uv_loop_init(&loop_); + if (ret != 0) { + isolate_->ThrowException(UVException(isolate_, ret, "uv_loop_init")); + return; + } + loop_.data = this; + } + + MicrotaskQueue* microtask_queue = + own_microtaskqueue ? + (microtask_queue_ = v8::MicrotaskQueue::New( + isolate_, v8::MicrotasksPolicy::kExplicit)).get() : + outer_context_.Get(isolate_)->GetMicrotaskQueue(); + uv_loop_t* loop = own_loop ? &loop_ : GetCurrentEventLoop(isolate_); + + Local context = Context::New( + isolate_, + nullptr /* extensions */, + MaybeLocal() /* global_template */, + MaybeLocal() /* global_value */, + DeserializeInternalFieldsCallback() /* internal_fields_deserializer */, + microtask_queue); + context->SetSecurityToken(outer_context->GetSecurityToken()); + if (context.IsEmpty() || !InitializeContext(context).FromMaybe(false)) { + return; + } + + context_.Reset(isolate_, context); + Context::Scope context_scope(context); + isolate_data_ = CreateIsolateData( + isolate_, + loop, + GetMultiIsolatePlatform(outer_env), + GetArrayBufferAllocator(GetEnvironmentIsolateData(outer_env))); + assert(isolate_data_ != nullptr); + ThreadId thread_id = AllocateEnvironmentThreadId(); + auto inspector_parent_handle = GetInspectorParentHandle( + outer_env, thread_id, "file:///synchronous-worker.js"); + env_ = CreateEnvironment( + isolate_data_, + context, + {}, + {}, + static_cast( + EnvironmentFlags::kTrackUnmanagedFds | + EnvironmentFlags::kNoRegisterESMLoader), + thread_id, + std::move(inspector_parent_handle)); + assert(env_ != nullptr); + SetProcessExitHandler(env_, [this](Environment* env, int code) { + OnExit(code); + }); +} + +void SynchronousWorker::OnExit(int code) { + HandleScope handle_scope(isolate_); + Local self = wrap_.Get(isolate_); + Local outer_context = outer_context_.Get(isolate_); + Context::Scope context_scope(outer_context); + Isolate::SafeForTerminationScope termination_scope(isolate_); + Local onexit_v; + if (!self->Get(outer_context, String::NewFromUtf8Literal(isolate_, "onexit")) + .ToLocal(&onexit_v) || !onexit_v->IsFunction()) { + return; + } + Local args[] = { Integer::New(isolate_, code) }; + USE(onexit_v.As()->Call(outer_context, self, 1, args)); + SignalStop(); +} + +void SynchronousWorker::SignalStop() { + signaled_stop_ = true; + if (env_ != nullptr && can_be_terminated_) { + node::Stop(env_); + } +} + +void SynchronousWorker::Stop(bool may_throw) { + if (loop_.data == nullptr) { + // If running in shared-event-loop mode, spin the outer event loop + // until all currently pending items have been addressed, so that + // FreeEnvironment() does not run the outer loop's handles. + TryCatch try_catch(isolate_); + try_catch.SetVerbose(true); + SealHandleScope seal_handle_scope(isolate_); + uv_run(GetCurrentEventLoop(isolate_), UV_RUN_NOWAIT); + } + if (env_ != nullptr) { + if (!signaled_stop_) { + SignalStop(); + isolate_->CancelTerminateExecution(); + } + FreeEnvironment(env_); + env_ = nullptr; + } + if (isolate_data_ != nullptr) { + FreeIsolateData(isolate_data_); + isolate_data_ = nullptr; + } + context_.Reset(); + outer_context_.Reset(); + if (loop_.data != nullptr) { + loop_.data = nullptr; + int ret = uv_loop_close(&loop_); + if (ret != 0 && may_throw) { + isolate_->ThrowException(UVException(isolate_, ret, "uv_loop_close")); + } + } + microtask_queue_.reset(); + + RemoveEnvironmentCleanupHook(isolate_, CleanupHook, this); + if (!wrap_.IsEmpty()) { + HandleScope handle_scope(isolate_); + wrap_.Get(isolate_)->SetAlignedPointerInInternalField(0, nullptr); + } + wrap_.Reset(); + delete this; +} + +MaybeLocal SynchronousWorker::Load(Local callback) { + if (env_ == nullptr || signaled_stop_) { + Environment* env = Environment::GetCurrent(isolate_); + THROW_ERR_INVALID_STATE(env, "Worker not initialized"); + return MaybeLocal(); + } + + SynchronousWorkerScope worker_scope(this); + return worker_scope.EscapeMaybe( + LoadEnvironment(env_, [&](const StartExecutionCallbackInfo& info) { + Local argv[] = { + info.process_object, + info.native_require, + context()->Global() + }; + return callback->Call(context(), Null(isolate_), 3, argv); + })); +} + +void SynchronousWorker::CleanupHook(void* arg) { + static_cast(arg)->Stop(false); +} + +void SynchronousWorker::RunLoop(uv_run_mode mode) { + Environment* env = Environment::GetCurrent(isolate_); + if (loop_.data == nullptr || context_.IsEmpty() || signaled_stop_) { + return THROW_ERR_INVALID_STATE(env, "Worker has been stopped"); + } + if (loop_is_running_) { + return THROW_ERR_INVALID_STATE(env, "Cannot nest calls to runLoop"); + } + SynchronousWorkerScope worker_scope(this); + TryCatch try_catch(isolate_); + try_catch.SetVerbose(true); + SealHandleScope seal_handle_scope(isolate_); + loop_is_running_ = true; + uv_run(&loop_, mode); + loop_is_running_ = false; + if (signaled_stop_) { + isolate_->CancelTerminateExecution(); + } +} + +bool SynchronousWorker::IsLoopAlive() { + if (loop_.data == nullptr || signaled_stop_) return false; + return uv_loop_alive(&loop_); +} + +void SynchronousWorker::MemoryInfo(MemoryTracker* tracker) const { + // TODO(@jasnell): Implement +} + +// ============================================================================ namespace { // Return the MessagePort that is global for this Environment and communicates @@ -921,6 +1363,8 @@ void InitWorker(Local target, env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate()); } + SynchronousWorker::Initialize(env, target); + SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort); target @@ -968,6 +1412,8 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); + + SynchronousWorker::RegisterExternalReferences(registry); } } // anonymous namespace diff --git a/test/parallel/test-synchronousworker.js b/test/parallel/test-synchronousworker.js new file mode 100644 index 00000000000000..a8b7428c29ea5e --- /dev/null +++ b/test/parallel/test-synchronousworker.js @@ -0,0 +1,264 @@ +'use strict'; + +const common = require('../common'); + +const { + strictEqual, + throws, +} = require('node:assert'); + +const { + SynchronousWorker, +} = require('worker_threads'); + +function deferred() { + let res; + const promise = new Promise((resolve) => res = resolve); + return { res, promise }; +} + +{ + const w = new SynchronousWorker(); + + w.runInWorkerScope(() => { + const req = w.createRequire(__filename); + const http = req('http'); + const vm = req('vm'); + const httpServer = http.createServer((req, res) => { + if (req.url === '/stop') { + w.stop(); + } + res.writeHead(200); + res.end('Ok\n'); + }); + + // TODO(@jasnell): The original version of this test uses fetch() from node-fetch. + // Trying to use the built-in fetch here, however, causes the test to hang because + // the initial fetch() promise never resolves. Need to determine why. + + // httpServer.listen(0, req('vm').runInThisContext(`({fetch, httpServer }) => (async () => { + // const res = await fetch('http://localhost:' + httpServer.address().port + '/'); + // globalThis.responseText = await res.text(); + // await fetch('http://localhost:' + httpServer.address().port + '/stop'); + // })`)({ fetch: w.globalThis.fetch, httpServer })); + + httpServer.listen(0, vm.runInThisContext(`({fetch, httpServer, http }) => (async () => { + const url = 'http://localhost:' + httpServer.address().port + '/'; + let resolver; + const done = new Promise((resolve) => resolver = resolve); + globalThis.responseText = ''; + http.get(url, (res) => { + res.setEncoding('utf8'); + res.on('data', (chunk) => globalThis.responseText += chunk); + res.on('end', () => { + http.get(url + 'stop', () => { + console.log('!!'); + resolver(); + }); + }); + }); + await done; + })`)({ fetch, httpServer, http })); + }); + + strictEqual(w.loopAlive, true); + w.runLoop('default'); + strictEqual(w.loopAlive, false); + + strictEqual(w.globalThis.responseText, 'Ok\n'); +} + +// With its own µtask queue' +{ + const w = new SynchronousWorker({ sharedEventLoop: true }); + let ran = false; + w.runInWorkerScope(() => { + w.globalThis.queueMicrotask(() => ran = true); + }); + strictEqual(ran, true); +} + +// With its own µtask queue but shared event loop +(function() { + const w = new SynchronousWorker({ sharedEventLoop: true }); + let ran = false; + w.runInWorkerScope(() => { + w.globalThis.setImmediate(() => { + w.globalThis.queueMicrotask(() => ran = true); + }); + }); + strictEqual(ran, false); + + const { res, promise } = deferred(); + + setImmediate(() => { + strictEqual(ran, true); + res(); + }); + + return promise; +})().then(common.mustCall()); + +// With its own loop but shared µtask queue +{ + const w = new SynchronousWorker({ sharedMicrotaskQueue: true }); + let ran = false; + w.runInWorkerScope(() => { + w.globalThis.setImmediate(() => { + w.globalThis.queueMicrotask(() => ran = true); + }); + }); + + strictEqual(ran, false); + w.runLoop('default'); + strictEqual(ran, true); +} + +// With its own loop but shared µtask queue (no callback scope) +(async function() { + const w = new SynchronousWorker({ sharedMicrotaskQueue: true }); + let ran = false; + w.globalThis.queueMicrotask(() => ran = true); + strictEqual(ran, false); + const { res, promise } = deferred(); + queueMicrotask(() => { + strictEqual(ran, true); + res(); + }); + return promise; +})().then(common.mustCall()); + +// Allows waiting for a specific promise to be resolved +{ + const w = new SynchronousWorker(); + const req = w.createRequire(__filename); + let srv; + let serverUpPromise; + let fetchPromise; + w.runInWorkerScope(() => { + srv = req('http').createServer((req, res) => res.end('contents')).listen(0); + serverUpPromise = req('events').once(srv, 'listening'); + }); + w.runLoopUntilPromiseResolved(serverUpPromise); + w.runInWorkerScope(() => { + const http = req('http'); + const { res, promise } = deferred(); + // TODO(@jasnell): The original version here used node-fetch. Using built in fetch + // does not work for some reason... + http.get('http://localhost:' + srv.address().port, (response) => { + res({ + ok: true, + status: response.statusCode, + }); + }); + fetchPromise = promise; + }); + const fetched = w.runLoopUntilPromiseResolved(fetchPromise); + strictEqual(fetched.ok, true); + strictEqual(fetched.status, 200); +} + +// process.exit interrupts runInWorkerScope +{ + const w = new SynchronousWorker(); + let ranBefore = false; + let ranAfter = false; + let observedCode = -1; + w.on('exit', (code) => observedCode = code); + w.runInWorkerScope(() => { + ranBefore = true; + w.process.exit(1); + ranAfter = true; + }); + strictEqual(ranBefore, true); + strictEqual(ranAfter, false); + strictEqual(observedCode, 1); +} + +// process.exit interrupts runLoop +{ + const w = new SynchronousWorker(); + let ranBefore = false; + let ranAfter = false; + let observedCode = -1; + w.on('exit', (code) => observedCode = code); + w.runInWorkerScope(() => { + w.globalThis.setImmediate(() => { + ranBefore = true; + w.process.exit(1); + ranAfter = true; + }); + }); + w.runLoop('default'); + strictEqual(ranBefore, true); + strictEqual(ranAfter, false); + strictEqual(observedCode, 1); +} + +// process.exit does not kill the process outside of any scopes +{ + const w = new SynchronousWorker(); + let observedCode = -1; + + w.on('exit', (code) => observedCode = code); + w.process.exit(1); + + strictEqual(observedCode, 1); + + throws(() => { + w.runLoop('default'); + }, /Worker has been stopped/); +} + +// Allows adding uncaught exception listeners +{ + const w = new SynchronousWorker(); + let uncaughtException; + let erroredOrExited = false; + w.on('exit', () => erroredOrExited = true); + w.on('errored', () => erroredOrExited = true); + w.process.on('uncaughtException', (err) => uncaughtException = err); + w.globalThis.setImmediate(() => { throw new Error('foobar'); }); + w.runLoop('default'); + strictEqual(erroredOrExited, false); + strictEqual(uncaughtException.message, 'foobar'); +} + +// Handles entirely uncaught exceptions inside the loop well +{ + const w = new SynchronousWorker(); + let observedCode; + let observedError; + w.on('exit', (code) => observedCode = code); + w.on('error', (error) => observedError = error); + w.globalThis.setImmediate(() => { throw new Error('foobar'); }); + w.runLoop('default'); + strictEqual(observedCode, 1); + strictEqual(observedError.message, 'foobar'); +} + +// Forbids nesting .runLoop() calls +{ + const w = new SynchronousWorker(); + let uncaughtException; + w.process.on('uncaughtException', (err) => uncaughtException = err); + w.globalThis.setImmediate(() => w.runLoop('default')); + w.runLoop('default'); + strictEqual(uncaughtException.message, 'Cannot nest calls to runLoop'); +} + +// Properly handles timers that are about to expire when FreeEnvironment() is called on +// a shared event loop +(async function() { + const w = new SynchronousWorker({ + sharedEventLoop: true, + sharedMicrotaskQueue: true + }); + + setImmediate(() => { + setTimeout(() => {}, 20); + const now = Date.now(); + while (Date.now() - now < 30); + }); + await w.stop(); +})().then(common.mustCall()); diff --git a/tools/license-builder.sh b/tools/license-builder.sh index b50732e0c14e57..ad17447f099e07 100755 --- a/tools/license-builder.sh +++ b/tools/license-builder.sh @@ -138,4 +138,7 @@ addlicense "node-fs-extra" "lib/internal/fs/cp" "$licenseText" addlicense "base64" "deps/base64/base64/" "$(cat "${rootdir}"/deps/base64/base64/LICENSE)" +licenseText="$(curl -sL https://github.com/raw/addaleax/synchronous-worker/HEAD/LICENSE)" +addlicense "synchronous-worker" "lib/worker_threads.js" "$licenseText" + mv "$tmplicense" "$licensefile"