From 8a38d8be9f9db036e62ee5632a13795333aae487 Mon Sep 17 00:00:00 2001 From: Jack Works Date: Mon, 12 Feb 2024 15:20:08 +0800 Subject: [PATCH] feat: add signal and forceSignal --- .changeset/short-eels-cover.md | 5 + .../__file_snapshots__/async-call-batch.md | 2 +- .../async-call-client-abort-signal.md | 125 ++++++++ .../async-call-multiple-batch.md | 273 ++++++++++++++++++ .../async-call-server-abort-signal.md | 199 +++++++++++++ __tests__/abort-signal.ts | 80 +++++ __tests__/batch.ts | 53 +++- __tests__/utils/test.ts | 6 +- src/Async-Call.ts | 31 +- src/core/batch.ts | 1 + src/types.ts | 31 +- src/utils/error.ts | 12 +- 12 files changed, 808 insertions(+), 10 deletions(-) create mode 100644 .changeset/short-eels-cover.md create mode 100644 __tests__/__file_snapshots__/async-call-client-abort-signal.md create mode 100644 __tests__/__file_snapshots__/async-call-multiple-batch.md create mode 100644 __tests__/__file_snapshots__/async-call-server-abort-signal.md create mode 100644 __tests__/abort-signal.ts diff --git a/.changeset/short-eels-cover.md b/.changeset/short-eels-cover.md new file mode 100644 index 0000000..c05d00b --- /dev/null +++ b/.changeset/short-eels-cover.md @@ -0,0 +1,5 @@ +--- +"async-call-rpc": minor +--- + +add signal and forceSignal diff --git a/__tests__/__file_snapshots__/async-call-batch.md b/__tests__/__file_snapshots__/async-call-batch.md index 8fe0e23..e0763d0 100644 --- a/__tests__/__file_snapshots__/async-call-batch.md +++ b/__tests__/__file_snapshots__/async-call-batch.md @@ -108,7 +108,7 @@ Array [ ```php Array [ - "In this part it should be no log", + "In this part it should has no log", ] ``` diff --git a/__tests__/__file_snapshots__/async-call-client-abort-signal.md b/__tests__/__file_snapshots__/async-call-client-abort-signal.md new file mode 100644 index 0000000..661d24c --- /dev/null +++ b/__tests__/__file_snapshots__/async-call-client-abort-signal.md @@ -0,0 +1,125 @@ +# Timeline + +## T=0 Message: client sent + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 200, + "first", + ], +} +``` + +## T=1 Message: client sent + +```php +Object { + "id": 1, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 400, + "second", + ], +} +``` + +## T=2 Message: server received + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 200, + "first", + ], +} +``` + +## T=3 Log: server/log + +```php +Array [ + "rpc.%cdelay%c(%o, %o%c) +%o %c@0", + "color:#d2c057", + "", + 200, + "first", + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=4 Message: server received + +```php +Object { + "id": 1, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 400, + "second", + ], +} +``` + +## T=5 Log: server/log + +```php +Array [ + "rpc.%cdelay%c(%o, %o%c) +%o %c@1", + "color:#d2c057", + "", + 400, + "second", + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=6 Log: testRunner/log + +```php +Array [ + "soft abort", +] +``` + +## T=7 Message: server => client + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "result": undefined, +} +``` + +## T=8 Log: testRunner/log + +```php +Array [ + "hard abort", +] +``` + +## T=9 Message: server => client + +```php +Object { + "id": 1, + "jsonrpc": "2.0", + "result": undefined, +} +``` diff --git a/__tests__/__file_snapshots__/async-call-multiple-batch.md b/__tests__/__file_snapshots__/async-call-multiple-batch.md new file mode 100644 index 0000000..829a93b --- /dev/null +++ b/__tests__/__file_snapshots__/async-call-multiple-batch.md @@ -0,0 +1,273 @@ +# Timeline + +## T=0 Log: testRunner/log + +```php +Array [ + "Before this line no request from batch 1 was sent", +] +``` + +## T=1 Message: client => server + +```php +Array [ + Object { + "id": 0, + "jsonrpc": "2.0", + "method": "add", + "params": Array [ + 2, + 3, + ], + }, + Object { + "id": 1, + "jsonrpc": "2.0", + "method": "echo", + "params": Array [ + 1, + ], + }, +] +``` + +## T=2 Log: server/log + +```php +Array [ + "rpc.%cadd%c(%o, %o%c) +%o %c@0", + "color:#d2c057", + "", + 2, + 3, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=3 Log: server/log + +```php +Array [ + "rpc.%cecho%c(%o%c) +%o %c@1", + "color:#d2c057", + "", + 1, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=4 Message: server => client + +```php +Array [ + Object { + "id": 0, + "jsonrpc": "2.0", + "result": 5, + }, + Object { + "id": 1, + "jsonrpc": "2.0", + "result": 1, + }, +] +``` + +## T=5 Log: testRunner/log + +```php +Array [ + "After this line no request from batch 1 was sent", +] +``` + +## T=6 Log: testRunner/log + +```php +Array [ + "Before this line no request from batch 2 was sent", +] +``` + +## T=7 Message: client => server + +```php +Array [ + Object { + "id": 2, + "jsonrpc": "2.0", + "method": "add", + "params": Array [ + 4, + 5, + ], + }, + Object { + "id": 3, + "jsonrpc": "2.0", + "method": "echo", + "params": Array [ + 2, + ], + }, +] +``` + +## T=8 Log: server/log + +```php +Array [ + "rpc.%cadd%c(%o, %o%c) +%o %c@2", + "color:#d2c057", + "", + 4, + 5, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=9 Log: server/log + +```php +Array [ + "rpc.%cecho%c(%o%c) +%o %c@3", + "color:#d2c057", + "", + 2, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=10 Message: server => client + +```php +Array [ + Object { + "id": 2, + "jsonrpc": "2.0", + "result": 9, + }, + Object { + "id": 3, + "jsonrpc": "2.0", + "result": 2, + }, +] +``` + +## T=11 Log: testRunner/log + +```php +Array [ + "After this line no request from batch 2 was sent", +] +``` + +## T=12 Log: testRunner/log + +```php +Array [ + "Part 1 end", +] +``` + +## T=13 Log: testRunner/log + +```php +Array [ + "Part 2 start", +] +``` + +## T=14 Message: client => server + +```php +Array [ + Object { + "id": 6, + "jsonrpc": "2.0", + "method": "add", + "params": Array [ + 4, + 5, + ], + }, + Object { + "id": 7, + "jsonrpc": "2.0", + "method": "echo", + "params": Array [ + 2, + ], + }, +] +``` + +## T=15 Log: server/log + +```php +Array [ + "rpc.%cadd%c(%o, %o%c) +%o %c@6", + "color:#d2c057", + "", + 4, + 5, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=16 Log: server/log + +```php +Array [ + "rpc.%cecho%c(%o%c) +%o %c@7", + "color:#d2c057", + "", + 2, + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=17 Message: server => client + +```php +Array [ + Object { + "id": 6, + "jsonrpc": "2.0", + "result": 9, + }, + Object { + "id": 7, + "jsonrpc": "2.0", + "result": 2, + }, +] +``` + +## T=18 Log: testRunner/log + +```php +Array [ + "Part 2 end", +] +``` diff --git a/__tests__/__file_snapshots__/async-call-server-abort-signal.md b/__tests__/__file_snapshots__/async-call-server-abort-signal.md new file mode 100644 index 0000000..75cbea8 --- /dev/null +++ b/__tests__/__file_snapshots__/async-call-server-abort-signal.md @@ -0,0 +1,199 @@ +# Timeline + +## T=0 Message: client sent + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 200, + "first", + ], +} +``` + +## T=1 Message: client sent + +```php +Object { + "id": 1, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 400, + "second", + ], +} +``` + +## T=2 Message: server received + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 200, + "first", + ], +} +``` + +## T=3 Log: server/log + +```php +Array [ + "rpc.%cdelay%c(%o, %o%c) +%o %c@0", + "color:#d2c057", + "", + 200, + "first", + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=4 Message: server received + +```php +Object { + "id": 1, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 400, + "second", + ], +} +``` + +## T=5 Log: server/log + +```php +Array [ + "rpc.%cdelay%c(%o, %o%c) +%o %c@1", + "color:#d2c057", + "", + 400, + "second", + "", + Promise {}, + "color:gray;font-style:italic;", +] +``` + +## T=6 Log: testRunner/log + +```php +Array [ + "soft abort", +] +``` + +## T=7 Message: client => server + +```php +Object { + "id": 2, + "jsonrpc": "2.0", + "method": "delay", + "params": Array [ + 0, + "post abort", + ], +} +``` + +## T=8 Log: server/log + +```php +Array [ + [AbortError: This operation was aborted], +] +``` + +## T=9 Message: server => client + +```php +Object { + "error": Object { + "code": -1, + "data": Object { + "type": "DOMException:AbortError", + }, + "message": "This operation was aborted", + }, + "id": 2, + "jsonrpc": "2.0", +} +``` + +## T=10 Log: client/log + +```php +Array [ + "DOMException:AbortError: This operation was aborted(-1) %c@2 +%c", + "color: gray", + "", +] +``` + +## T=11 Message: server => client + +```php +Object { + "id": 0, + "jsonrpc": "2.0", + "result": undefined, +} +``` + +## T=12 Log: testRunner/log + +```php +Array [ + "hard abort", +] +``` + +## T=13 Log: server/log + +```php +Array [ + [AbortError: This operation was aborted], +] +``` + +## T=14 Message: server => client + +```php +Object { + "error": Object { + "code": -1, + "data": Object { + "type": "DOMException:AbortError", + }, + "message": "This operation was aborted", + }, + "id": 1, + "jsonrpc": "2.0", +} +``` + +## T=15 Log: client/log + +```php +Array [ + "DOMException:AbortError: This operation was aborted(-1) %c@1 +%c", + "color: gray", + "", +] +``` diff --git a/__tests__/abort-signal.ts b/__tests__/abort-signal.ts new file mode 100644 index 0000000..48de638 --- /dev/null +++ b/__tests__/abort-signal.ts @@ -0,0 +1,80 @@ +import { expect, it } from 'vitest' +import { delay, withSnapshotDefault } from './utils/test.js' + +it( + 'works with AbortSignal on the client side', + withSnapshotDefault('async-call-client-abort-signal', async ({ init, log }) => { + const clientSignal = new AbortController() + const clientForceSignal = new AbortController() + const server = init<{ delay(ms: number, comment: string): Promise }>({ + client: { signal: clientSignal.signal, forceSignal: clientForceSignal.signal }, + impl: { delay }, + }) + server.delay(200, 'first').then(() => (promiseResolved = true)) + const promise2 = server.delay(400, 'second').then(() => (promise2Resolved = true)) + + let promiseResolved = false + let promise2Resolved = false + + await delay(100) // T=0 + log('soft abort') + clientSignal.abort() + + // new requests rejected + const p = expect(server.delay(0, 'post abort')).rejects.toThrowErrorMatchingInlineSnapshot( + `[AbortError: This operation was aborted]`, + ) + await delay(100) // T=100 + expect(promiseResolved || promise2Resolved, 'pending promises should not be rejected').toBe(false) + + await delay(200) // T=300 + expect(promiseResolved, 'current promise should be able to resolve').toBe(true) + + log('hard abort') + clientForceSignal.abort() + // pending requests should be rejected + await p + await expect(promise2).rejects.toThrowErrorMatchingInlineSnapshot(`[AbortError: This operation was aborted]`) + + await delay(200) + }), +) + +it( + 'works with AbortSignal on the server side', + withSnapshotDefault('async-call-server-abort-signal', async ({ init, log }) => { + const serverSignal = new AbortController() + const serverForceSignal = new AbortController() + const server = init<{ delay(ms: number, comment: string): Promise }>({ + server: { signal: serverSignal.signal, forceSignal: serverForceSignal.signal }, + impl: { delay }, + }) + server.delay(200, 'first').then(() => (promiseResolved = true)) + const promise2 = server.delay(400, 'second').then(() => (promise2Resolved = true)) + + let promiseResolved = false + let promise2Resolved = false + + // wait request to be send to the client + await delay(100) // T=0 + log('soft abort') + serverSignal.abort() + + // new requests rejected + const p = expect(server.delay(0, 'post abort')).rejects.toThrowErrorMatchingInlineSnapshot( + `[AbortError: This operation was aborted]`, + ) + + await delay(100) // T=100 + expect(promiseResolved || promise2Resolved, 'pending promises should not be rejected').toBe(false) + + await delay(200) // T=300 + expect(promiseResolved, 'current promise should be able to resolve').toBe(true) + + log('hard abort') + serverForceSignal.abort() + // pending requests should be rejected + await p + await expect(promise2).rejects.toThrowErrorMatchingInlineSnapshot(`[AbortError: This operation was aborted]`) + }), +) diff --git a/__tests__/batch.ts b/__tests__/batch.ts index daf35f9..46a8a51 100644 --- a/__tests__/batch.ts +++ b/__tests__/batch.ts @@ -30,7 +30,7 @@ it( log('Part 2 start') const a = server.add(2, 3) const b = server.echo(1) - log('In this part it should be no log') + log('In this part it should has no log') drop() expect(a).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Aborted]`) expect(b).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Aborted]`) @@ -40,3 +40,54 @@ it( expect(server.add).toBe(server.add) }), ) + +it( + 'can create multiple batch', + withSnapshotDefault('async-call-multiple-batch', async ({ init, log }) => { + const _server = init() + const [server, send, drop] = batch(_server) + const [server2, send2] = batch(_server) + + // should not send anything + send() + { + const a = server.add(2, 3) + const b = server.echo(1) + const a2 = server2.add(4, 5) + const b2 = server2.echo(2) + + log('Before this line no request from batch 1 was sent') + send() + await a + await b + log('After this line no request from batch 1 was sent') + send() + + log('Before this line no request from batch 2 was sent') + send2() + await a2 + await b2 + log('After this line no request from batch 2 was sent') + send2() + + log('Part 1 end') + } + { + log('Part 2 start') + const a = server.add(2, 3) + const b = server.echo(1) + const a2 = server2.add(4, 5) + const b2 = server2.echo(2) + + drop() + await expect(a).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Aborted]`) + await expect(b).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Aborted]`) + + send2() + await a2 + await b2 + + log('Part 2 end') + } + }), +) diff --git a/__tests__/utils/test.ts b/__tests__/utils/test.ts index 7483939..9b8ce4a 100644 --- a/__tests__/utils/test.ts +++ b/__tests__/utils/test.ts @@ -147,13 +147,13 @@ withSnapshotDefault.debugger = (...[name, runner, options]: Parameters(x: Promise, timeoutTime: number): Promise { +export async function race(x: Promise, timeoutTime: number): Promise { return Promise.race([x, timeout(timeoutTime)]) } async function timeout(x: number): Promise { await delay(x) - throw new Error('Timeout') + throw new Error('Test timed out: ' + x + 'ms') } export function delay(x: number) { - return new Promise((r) => setTimeout(r, x)) + return new Promise((r) => setTimeout(r, x)) } diff --git a/src/Async-Call.ts b/src/Async-Call.ts index 8d2403d..81c61a4 100644 --- a/src/Async-Call.ts +++ b/src/Async-Call.ts @@ -21,6 +21,7 @@ import { makeHostedMessage, Err_Cannot_call_method_starts_with_rpc_dot_directly, Err_Then_is_accessed_on_local_implementation_Please_explicitly_mark_if_it_is_thenable_in_the_options, + onAbort, } from './utils/error.js' import { generateRandomID } from './utils/generateRandomID.js' import { normalizeStrictOptions, normalizeLogOptions } from './utils/normalizeOptions.js' @@ -124,6 +125,8 @@ export function AsyncCall( logger, channel, thenable, + signal, + forceSignal, } = options // Note: we're not shorten this error message because it will be removed in the next major version. @@ -133,6 +136,11 @@ export function AsyncCall( const paramStyle = deprecatedParameterStructures || parameterStructure || 'by-position' const logKey = name || deprecatedName || 'rpc' + const throwIfAborted = () => { + signal && signal.throwIfAborted() + forceSignal && forceSignal.throwIfAborted() + } + const { encode: encodeFromOption, encodeRequest: encodeRequestFromOption, @@ -184,7 +192,15 @@ export function AsyncCall( } = (logger || console) as ConsoleInterface type PromiseParam = [resolve: (value?: any) => void, reject: (reason?: any) => void, stack?: string] const requestContext = new Map() + + onAbort(forceSignal, () => { + requestContext.forEach((x) => x[1](forceSignal!.reason)) + requestContext.clear() + }) + const onRequest = async (data: Request): Promise => { + if ((signal && signal.aborted) || (forceSignal && forceSignal.aborted)) + return makeErrorObject((signal && signal.reason) || (forceSignal && forceSignal.reason), '', data) if (isThisSideImplementationPending) await awaitThisSideImplementation() // TODO: in next major version we should not send this message since it might contain sensitive info. else if (rejectedThisSideImplementation) return makeErrorObject(rejectedThisSideImplementation, '', data) @@ -356,15 +372,24 @@ export function AsyncCall( data: SuccessResponse | ErrorResponse | Request, ): Promise => { if ('method' in data) { - const r = onRequest(data) - if ('id' in data) return r - Promise_resolve(r).catch(() => {}) + if ('id' in data) { + if (!forceSignal) return onRequest(data) + return new Promise((resolve, reject) => { + const handleForceAbort = () => resolve(makeErrorObject(forceSignal.reason, '', data)) + onRequest(data) + .then(resolve, reject) + .finally(() => forceSignal.removeEventListener('abort', handleForceAbort)) + onAbort(forceSignal, handleForceAbort) + }) + } + onRequest(data).catch(() => {}) return // Skip response for notifications } return onResponse(data) as Promise } const call = (method: string | symbol, args: unknown[], stack: string | undefined, notify = false) => { return new Promise((resolve, reject) => { + throwIfAborted() let queue: BatchQueue | undefined = undefined if (method === AsyncCallBatch) { queue = args.shift() as any diff --git a/src/core/batch.ts b/src/core/batch.ts index dd0f5ca..fdfbba4 100644 --- a/src/core/batch.ts +++ b/src/core/batch.ts @@ -19,6 +19,7 @@ import type { Request } from '../types.js' * The third item is a function to drop and reject all pending requests. * @public */ +// TODO: use private field in the future. export function batch(asyncCallInstance: T): [T, () => void, (error?: unknown) => void] { const queue: BatchQueue = [] const getTrap = new Proxy( diff --git a/src/types.ts b/src/types.ts index c4e4aeb..ae12f4d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -114,7 +114,7 @@ export interface AsyncCallStrictOptions { * @defaultValue true */ unknownMessage?: boolean - // TODO: implement this if there is needed + // TODO: implement this if there is need /** * Controls if redundant arguments on the client triggers a warning or error. * @see {@link https://www.jsonrpc.org/specification#parameter_structures} @@ -278,6 +278,35 @@ export interface AsyncCallOptions void, options: { once: boolean }): void + removeEventListener(type: 'abort', listener: () => void): void + throwIfAborted(): void + reason: any } /** diff --git a/src/utils/error.ts b/src/utils/error.ts index 7712116..0e3545b 100644 --- a/src/utils/error.ts +++ b/src/utils/error.ts @@ -1,6 +1,13 @@ +import type { AbortSignalLike } from '../types.js' + class CustomError extends Error { // TODO: support cause - constructor(public name: string, message: string, public code: number, public stack: string) { + constructor( + public name: string, + message: string, + public code: number, + public stack: string, + ) { super(message) } } @@ -66,3 +73,6 @@ export const globalDOMException = (() => { } catch {} }) as () => DOMException | undefined type DOMException = { new (message: string, name: string): any } +export function onAbort(signal: AbortSignalLike | undefined, callback: () => void) { + signal && signal.addEventListener('abort', callback, { once: true }) +}