Skip to content

Commit

Permalink
process: add deferTick
Browse files Browse the repository at this point in the history
Adds a new scheduling primitive to resolve zaldo when mixing
traditional Node async programming with async/await and Promises.

We cannot "fix" nextTick without breaking the whole ecosystem.
nextTick usage should be discouraged and we should try to
incrementally move to this new primitive.

Refs: nodejs#51156
Refs: nodejs#51280
Refs: nodejs#51114
Refs: nodejs#51070
Refs: nodejs/undici#2497
PR-URL: nodejs#51471
  • Loading branch information
ronag committed Jan 15, 2024
1 parent 94f824a commit de7f0c1
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 27 deletions.
68 changes: 68 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,74 @@ const process = require('node:process');
process.debugPort = 5858;
```

## `process.deferTick(callback[, ...args])`

<!-- YAML
added: REPLACEME
-->

* `callback` {Function}
* `...args` {any} Additional arguments to pass when invoking the `callback`

`process.deferTick()` adds `callback` to the "defer tick queue". This queue is
fully drained after the current operation on the JavaScript stack runs to
completion and before the event loop is allowed to continue. It's possible to
create an infinite loop if one were to recursively call `process.deferTick()`.
See the [Event Loop][] guide for more background.

Unlike `process.nextTick`, `process.deferTick()` will run after the "next tick
queue" and the microtask queue has been fully drained as to avoid Zalgo when
combinding traditional node asynchronous code with Promises.

Consider the following example:

```js
// uncaughtException!
setImmediate(async () => {
const e = await new Promise(resolve => {
const e = new EventEmitter()
resolve(e)
process.nextTick(() => {
e.emit('error', new Error('process.nextTick'))
})
})
e.on('error', () => {})
})

// uncaughtException!
setImmediate(async () => {
const e = await new Promise(resolve => {
const e = new EventEmitter()
resolve(e)
queueMicrotask(() => {
e.emit('error', new Error('queueMicrotask'))
})
})
e.on('error', () => {})
})
```

In both of these cases the user will encounter an
`uncaughtException` error since the inner task
will execute before control is returned to the
caller of `await`. In order to fix this one should
use `process.deferTick` which will execute in the
expected order:

```js
// OK!
setImmediate(async () => {
const e = await new Promise(resolve => {
const e = new EventEmitter()
resolve(e)
process.deferTick(() => {
e.emit('error', new Error('process.deferTick'))
})
})
e.on('error', () => {})
})
```

## `process.disconnect()`

<!-- YAML
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,9 @@ process.emitWarning = emitWarning;
// bootstrap to make sure that any operation done before this are synchronous.
// If any ticks or timers are scheduled before this they are unlikely to work.
{
const { nextTick, runNextTicks } = setupTaskQueue();
const { nextTick, runNextTicks, deferTick } = setupTaskQueue();
process.nextTick = nextTick;
process.deferTick = deferTick;
// Used to emulate a tick manually in the JS land.
// A better name for this function would be `runNextTicks` but
// it has been exposed to the process object so we keep this legacy name
Expand Down
63 changes: 37 additions & 26 deletions lib/internal/process/task_queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,14 @@ const { AsyncResource } = require('async_hooks');
// *Must* match Environment::TickInfo::Fields in src/env.h.
const kHasTickScheduled = 0;

function hasTickScheduled() {
return tickInfo[kHasTickScheduled] === 1;
}

function setHasTickScheduled(value) {
tickInfo[kHasTickScheduled] = value ? 1 : 0;
}

const queue = new FixedQueue();
let queue = new FixedQueue();
let deferQueue = new FixedQueue();

// Should be in sync with RunNextTicksNative in node_task_queue.cc
function runNextTicks() {
if (!hasTickScheduled() && !hasRejectionToWarn())
if (tickInfo[kHasTickScheduled] === 0 && !hasRejectionToWarn())
runMicrotasks();
if (!hasTickScheduled() && !hasRejectionToWarn())
if (tickInfo[kHasTickScheduled] === 0 && !hasRejectionToWarn())
return;

processTicksAndRejections();
Expand Down Expand Up @@ -93,46 +86,63 @@ function processTicksAndRejections() {
emitAfter(asyncId);
}
runMicrotasks();

const tmp = queue;
queue = deferQueue;
deferQueue = tmp;
} while (!queue.isEmpty() || processPromiseRejections());
setHasTickScheduled(false);
tickInfo[kHasTickScheduled] = 0;
setHasRejectionToWarn(false);
}

// `nextTick()` will not enqueue any callback when the process is about to
// exit since the callback would not have a chance to be executed.
function nextTick(callback) {
function nextTick(callback, ...args) {
validateFunction(callback, 'callback');

if (process._exiting)
return;

let args;
switch (arguments.length) {
case 1: break;
case 2: args = [arguments[1]]; break;
case 3: args = [arguments[1], arguments[2]]; break;
case 4: args = [arguments[1], arguments[2], arguments[3]]; break;
default:
args = new Array(arguments.length - 1);
for (let i = 1; i < arguments.length; i++)
args[i - 1] = arguments[i];
if (tickInfo[kHasTickScheduled] === 0) {
tickInfo[kHasTickScheduled] = 1;
}

if (queue.isEmpty())
setHasTickScheduled(true);
const asyncId = newAsyncId();
const triggerAsyncId = getDefaultTriggerAsyncId();
const tickObject = {
[async_id_symbol]: asyncId,
[trigger_async_id_symbol]: triggerAsyncId,
callback,
args,
args: args.length > 0 ? args : null,
};
if (initHooksExist())
emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);
queue.push(tickObject);
}

function deferTick(callback, ...args) {
validateFunction(callback, 'callback');

if (process._exiting)
return;

if (tickInfo[kHasTickScheduled] === 0) {
tickInfo[kHasTickScheduled] = 1;
}

const asyncId = newAsyncId();
const triggerAsyncId = getDefaultTriggerAsyncId();
const tickObject = {
[async_id_symbol]: asyncId,
[trigger_async_id_symbol]: triggerAsyncId,
callback,
args: args.length > 0 ? args : null,
};
if (initHooksExist())
emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);
deferQueue.push(tickObject);
}

function runMicrotask() {
this.runInAsyncScope(() => {
const callback = this.callback;
Expand Down Expand Up @@ -166,6 +176,7 @@ module.exports = {
setTickCallback(processTicksAndRejections);
return {
nextTick,
deferTick,
runNextTicks,
};
},
Expand Down
15 changes: 15 additions & 0 deletions test/async-hooks/test-defertick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const common = require('../common');
const { EventEmitter } = require('events');

setImmediate(async () => {
const e = await new Promise((resolve) => {
const e = new EventEmitter();
resolve(e);
process.deferTick(common.mustCall(() => {
e.emit('error', new Error('kaboom'));
}));
});
e.on('error', common.mustCall(() => {}));
});

0 comments on commit de7f0c1

Please sign in to comment.