Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter option to getOneMessage() #1076

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

### getOneMessage()
### getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the parent process.
Expand All @@ -113,6 +114,18 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

#### getOneMessageOptions

_Type_: `object`

#### getOneMessageOptions.filter

_Type_: [`(Message) => boolean`](ipc.md#message-type)

Ignore any `message` that returns `false`.

[More info.](ipc.md#filter-messages)

### getEachMessage()

_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)
Expand Down Expand Up @@ -259,8 +272,9 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#exchanging-messages)

### subprocess.getOneMessage()
### subprocess.getOneMessage(getOneMessageOptions?)

_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the subprocess.
Expand Down Expand Up @@ -900,7 +914,7 @@ By default, this applies to both `stdout` and `stderr`, but [different values ca
_Type:_ `boolean`\
_Default:_ `true` if either the [`node`](#optionsnode) option or the [`ipcInput`](#optionsipcinput) is set, `false` otherwise

Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessage) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).
Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).

The subprocess must be a Node.js file.

Expand Down
2 changes: 1 addition & 1 deletion docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Synchronous execution is generally discouraged as it holds the CPU and prevents
- Signal termination: [`subprocess.kill()`](api.md#subprocesskillerror), [`subprocess.pid`](api.md#subprocesspid), [`cleanup`](api.md#optionscleanup) option, [`cancelSignal`](api.md#optionscancelsignal) option, [`forceKillAfterDelay`](api.md#optionsforcekillafterdelay) option.
- Piping multiple subprocesses: [`subprocess.pipe()`](api.md#subprocesspipefile-arguments-options).
- [`subprocess.iterable()`](lines.md#progressive-splitting).
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessage), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [`result.all`](api.md#resultall) is not interleaved.
- [`detached`](api.md#optionsdetached) option.
- The [`maxBuffer`](api.md#optionsmaxbuffer) option is always measured in bytes, not in characters, [lines](api.md#optionslines) nor [objects](transform.md#object-mode). Also, it ignores transforms and the [`encoding`](api.md#optionsencoding) option.
Expand Down
2 changes: 1 addition & 1 deletion docs/input.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ await execa({stdin: 'inherit'})`npm run scaffold`;

## Any input type

If the subprocess [uses Node.js](node.md), [almost any type](ipc.md#message-type) can be passed to the subprocess using the [`ipcInput`](ipc.md#send-an-initial-message) option. The subprocess retrieves that input using [`getOneMessage()`](api.md#getonemessage).
If the subprocess [uses Node.js](node.md), [almost any type](ipc.md#message-type) can be passed to the subprocess using the [`ipcInput`](ipc.md#send-an-initial-message) option. The subprocess retrieves that input using [`getOneMessage()`](api.md#getonemessagegetonemessageoptions).

```js
// main.js
Expand Down
28 changes: 25 additions & 3 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ When the [`ipc`](api.md#optionsipc) option is `true`, the current process and su

The `ipc` option defaults to `true` when using [`execaNode()`](node.md#run-nodejs-files) or the [`node`](node.md#run-nodejs-files) option.

The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessage). The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessage) instead.
The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions). The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) instead.

```js
// parent.js
Expand All @@ -33,7 +33,7 @@ console.log(await getOneMessage()); // 'Hello from parent'

## Listening to messages

[`subprocess.getOneMessage()`](api.md#subprocessgetonemessage) and [`getOneMessage()`](api.md#getonemessage) read a single message. To listen to multiple messages in a row, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) should be used instead.
[`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) read a single message. To listen to multiple messages in a row, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) should be used instead.

[`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) waits for the subprocess to end (even when using [`break`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/break) or [`return`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/return)). It throws if the subprocess [fails](api.md#result). This means you do not need to `await` the subprocess' [promise](execution.md#result).

Expand Down Expand Up @@ -67,9 +67,29 @@ for await (const message of getEachMessage()) {
}
```

## Filter messages

```js
import {getOneMessage} from 'execa';

const startMessage = await getOneMessage({
filter: message => message.type === 'start',
});
```

```js
import {getEachMessage} from 'execa';

for await (const message of getEachMessage()) {
if (message.type === 'start') {
// ...
}
}
```

## Retrieve all messages

The [`result.ipcOutput`](api.md#resultipcoutput) array contains all the messages sent by the subprocess. In many situations, this is simpler than using [`subprocess.getOneMessage()`](api.md#subprocessgetonemessage) and [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage).
The [`result.ipcOutput`](api.md#resultipcoutput) array contains all the messages sent by the subprocess. In many situations, this is simpler than using [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage).

```js
// main.js
Expand Down Expand Up @@ -118,6 +138,8 @@ By default, messages are serialized using [`structuredClone()`](https://develope
To limit messages to JSON instead, the [`serialization`](api.md#optionsserialization) option can be set to `'json'`.

```js
import {execaNode} from 'execa';

const subprocess = execaNode({serialization: 'json'})`child.js`;
```

Expand Down
31 changes: 24 additions & 7 deletions lib/ipc/get-one.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {once} from 'node:events';
import {once, on} from 'node:events';
import {
validateIpcOption,
validateConnection,
Expand All @@ -7,27 +7,31 @@ import {
} from './validation.js';

// Like `[sub]process.once('message')` but promise-based
export const getOneMessage = ({anyProcess, isSubprocess, ipc}) => {
export const getOneMessage = ({anyProcess, isSubprocess, ipc}, {filter} = {}) => {
const methodName = 'getOneMessage';
validateIpcOption(methodName, isSubprocess, ipc);
validateConnection(methodName, isSubprocess, anyProcess.channel !== null);

return onceMessage(anyProcess, isSubprocess, methodName);
return onceMessage({
anyProcess,
isSubprocess,
methodName,
filter,
});
};

const onceMessage = async (anyProcess, isSubprocess, methodName) => {
const onceMessage = async ({anyProcess, isSubprocess, methodName, filter}) => {
const controller = new AbortController();
try {
const [message] = await Promise.race([
once(anyProcess, 'message', {signal: controller.signal}),
return await Promise.race([
getMessage(anyProcess, filter, controller),
throwOnDisconnect({
anyProcess,
isSubprocess,
methodName,
controller,
}),
]);
return message;
} catch (error) {
disconnect(anyProcess);
throw error;
Expand All @@ -36,6 +40,19 @@ const onceMessage = async (anyProcess, isSubprocess, methodName) => {
}
};

const getMessage = async (anyProcess, filter, {signal}) => {
if (filter === undefined) {
const [message] = await once(anyProcess, 'message', {signal});
return message;
}

for await (const [message] of on(anyProcess, 'message', {signal})) {
if (filter(message)) {
return message;
}
}
};

const throwOnDisconnect = async ({anyProcess, isSubprocess, methodName, controller: {signal}}) => {
await once(anyProcess, 'disconnect', {signal});
throwOnEarlyDisconnect(methodName, isSubprocess);
Expand Down
11 changes: 5 additions & 6 deletions test-d/ipc/get-each.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ import {
type Options,
} from '../../index.js';

for await (const message of getEachMessage()) {
expectType<Message>(message);
}

expectError(getEachMessage(''));

const subprocess = execa('test', {ipc: true});

for await (const message of subprocess.getEachMessage()) {
Expand All @@ -22,7 +16,12 @@ for await (const message of execa('test', {ipc: true, serialization: 'json'}).ge
expectType<Message<'json'>>(message);
}

for await (const message of getEachMessage()) {
expectType<Message>(message);
}

expectError(subprocess.getEachMessage(''));
expectError(getEachMessage(''));

execa('test', {ipcInput: ''}).getEachMessage();
execa('test', {ipcInput: '' as Message}).getEachMessage();
Expand Down
37 changes: 32 additions & 5 deletions test-d/ipc/get-one.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
type Options,
} from '../../index.js';

expectType<Promise<Message>>(getOneMessage());
expectError(await getOneMessage(''));

const subprocess = execa('test', {ipc: true});
expectType<Message<'advanced'>>(await subprocess.getOneMessage());
expectType<Message<'json'>>(await execa('test', {ipc: true, serialization: 'json'}).getOneMessage());
expectType<Promise<Message<'advanced'>>>(subprocess.getOneMessage());
const jsonSubprocess = execa('test', {ipc: true, serialization: 'json'});
expectType<Promise<Message<'json'>>>(jsonSubprocess.getOneMessage());
expectType<Promise<Message>>(getOneMessage());

expectError(await subprocess.getOneMessage(''));
expectError(await getOneMessage(''));

await execa('test', {ipcInput: ''}).getOneMessage();
await execa('test', {ipcInput: '' as Message}).getOneMessage();
Expand All @@ -26,3 +26,30 @@ expectType<undefined>(execa('test', {}).getOneMessage);
expectType<undefined>(execa('test', {ipc: false}).getOneMessage);
expectType<undefined>(execa('test', {ipcInput: undefined}).getOneMessage);
expectType<undefined>(execa('test', {ipc: false, ipcInput: ''}).getOneMessage);

await subprocess.getOneMessage({filter: undefined} as const);
await subprocess.getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await jsonSubprocess.getOneMessage({filter: (message: Message<'json'>) => true} as const);
await jsonSubprocess.getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await subprocess.getOneMessage({filter: (message: Message<'advanced'> | bigint) => true} as const);
await subprocess.getOneMessage({filter: () => true} as const);
expectError(await subprocess.getOneMessage({filter: (message: Message<'advanced'>) => ''} as const));
// eslint-disable-next-line @typescript-eslint/no-empty-function
expectError(await subprocess.getOneMessage({filter(message: Message<'advanced'>) {}} as const));
expectError(await subprocess.getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await subprocess.getOneMessage({filter: (message: '') => true} as const));
expectError(await subprocess.getOneMessage({filter: true} as const));
expectError(await subprocess.getOneMessage({unknownOption: true} as const));

await getOneMessage({filter: undefined} as const);
await getOneMessage({filter: (message: Message) => true} as const);
await getOneMessage({filter: (message: Message<'advanced'>) => true} as const);
await getOneMessage({filter: (message: Message | bigint) => true} as const);
await getOneMessage({filter: () => true} as const);
expectError(await getOneMessage({filter: (message: Message) => ''} as const));
// eslint-disable-next-line @typescript-eslint/no-empty-function
expectError(await getOneMessage({filter(message: Message) {}} as const));
expectError(await getOneMessage({filter: (message: Message<'json'>) => true} as const));
expectError(await getOneMessage({filter: (message: '') => true} as const));
expectError(await getOneMessage({filter: true} as const));
expectError(await getOneMessage({unknownOption: true} as const));
11 changes: 6 additions & 5 deletions test-d/ipc/send.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import {
type Options,
} from '../../index.js';

const subprocess = execa('test', {ipc: true});
expectType<void>(await subprocess.sendMessage(''));
expectType<Promise<void>>(sendMessage(''));

expectError(await subprocess.sendMessage());
expectError(await sendMessage());
expectError(await subprocess.sendMessage(undefined));
expectError(await sendMessage(undefined));
expectError(await subprocess.sendMessage(0n));
expectError(await sendMessage(0n));
expectError(await subprocess.sendMessage(Symbol('test')));
expectError(await sendMessage(Symbol('test')));

const subprocess = execa('test', {ipc: true});
expectType<void>(await subprocess.sendMessage(''));

expectError(await subprocess.sendMessage());

await execa('test', {ipcInput: ''}).sendMessage('');
await execa('test', {ipcInput: '' as Message}).sendMessage('');
await execa('test', {} as Options).sendMessage?.('');
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/ipc-echo-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env node
import {sendMessage, getOneMessage} from '../../index.js';
import {foobarArray} from '../helpers/input.js';

await sendMessage(await getOneMessage(({filter: message => message === foobarArray[1]})));
8 changes: 8 additions & 0 deletions test/fixtures/ipc-echo-twice-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env node
import {sendMessage, getOneMessage} from '../../index.js';
import {alwaysPass} from '../helpers/ipc.js';

const message = await getOneMessage({filter: alwaysPass});
const secondMessagePromise = getOneMessage({filter: alwaysPass});
await sendMessage(message);
await sendMessage(await secondMessagePromise);
11 changes: 11 additions & 0 deletions test/fixtures/ipc-process-error-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env node
import process from 'node:process';
import {getOneMessage} from '../../index.js';
import {foobarString} from '../helpers/input.js';
import {alwaysPass} from '../helpers/ipc.js';

const cause = new Error(foobarString);
await Promise.all([
getOneMessage({filter: alwaysPass}),
process.emit('error', cause),
]);
2 changes: 2 additions & 0 deletions test/helpers/ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export const iterateAllMessages = async subprocess => {

return messages;
};

export const alwaysPass = () => true;
11 changes: 11 additions & 0 deletions test/ipc/buffer-messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,14 @@ test('Sets empty error.ipcOutput, sync', t => {
const {ipcOutput} = t.throws(() => execaSync('fail.js'));
t.deepEqual(ipcOutput, []);
});

test('"error" event interrupts result.ipcOutput', async t => {
const subprocess = execa('ipc-echo-twice.js', {ipcInput: foobarString});
t.is(await subprocess.getOneMessage(), foobarString);

const cause = new Error(foobarString);
subprocess.emit('error', cause);
const error = await t.throwsAsync(subprocess);
t.is(error.cause, cause);
t.deepEqual(error.ipcOutput, [foobarString]);
});
Loading