Skip to content

Commit

Permalink
feat: allow channel to be async
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack-Works committed Apr 26, 2023
1 parent 36de23c commit b9d8410
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/light-ladybugs-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'async-call-rpc': minor
---

allow channel to be async
2 changes: 1 addition & 1 deletion __tests__/async-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ it(
'launches with rejected implementation',
withSnapshotDefault('async-call-impl-rejected', async (f) => {
const server = f({ impl: Promise.reject(new TypeError('Import failed')) })
await expect((server as any).add(1, 2)).rejects.toThrowErrorMatchingInlineSnapshot(`"Import failed"`)
await expect((server as any).add(1, 2)).rejects.toThrowErrorMatchingInlineSnapshot('"Import failed"')
}),
)

Expand Down
2 changes: 1 addition & 1 deletion __tests__/bad-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ it(
const server = _()
raw.client.send(Request('a', 'rpc.async-iterator.next', ['b', undefined]))
const iter = (server as any).not_found()
expect(iter.next()).rejects.toThrowErrorMatchingInlineSnapshot(`"not_found is not a function"`)
expect(iter.next()).rejects.toThrowErrorMatchingInlineSnapshot('"not_found is not a function"')
await delay(100)
}),
)
Expand Down
6 changes: 3 additions & 3 deletions __tests__/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ it(
await Promise.all([
expect(server.add(1, 2)).resolves.toMatchInlineSnapshot(`3`),
expect(server.echo('string')).resolves.toMatchInlineSnapshot(`"string"`),
expect(server.throws()).rejects.toThrowErrorMatchingInlineSnapshot(`"impl error"`),
expect(server.throwEcho('1')).rejects.toThrowErrorMatchingInlineSnapshot(`"1"`),
expect(server.throws()).rejects.toThrowErrorMatchingInlineSnapshot('"impl error"'),
expect(server.throwEcho('1')).rejects.toThrowErrorMatchingInlineSnapshot('"1"'),
// Unknown methods
expect((server as any).not_found()).rejects.toThrowErrorMatchingInlineSnapshot(`"Method not found"`),
expect((server as any).not_found()).rejects.toThrowErrorMatchingInlineSnapshot('"Method not found"'),
// Keep this reference
expect(server.withThisRef()).resolves.toMatchInlineSnapshot(`3`),
])
Expand Down
4 changes: 2 additions & 2 deletions __tests__/non-strict.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ it(
withSnapshotDefault('async-call-strict', async (f, _, _log, raw) => {
const server: any = f()
raw.client.send('unknown message')
await expect(server.not_defined_method()).rejects.toThrowErrorMatchingInlineSnapshot(`"Method not found"`)
await expect(server.not_defined_method()).rejects.toThrowErrorMatchingInlineSnapshot('"Method not found"')
}),
)

Expand All @@ -33,6 +33,6 @@ it(
withSnapshotDefault('async-call-strict-partial', async (f, _, _log, raw) => {
const server: any = f({ opts: { strict: { methodNotFound: true } } })
raw.client.send('unknown message')
await expect(server.not_defined_method()).rejects.toThrowErrorMatchingInlineSnapshot(`"Method not found"`)
await expect(server.not_defined_method()).rejects.toThrowErrorMatchingInlineSnapshot('"Method not found"')
}),
)
2 changes: 1 addition & 1 deletion __tests__/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ it(
const server = f({
opts: { mapError: () => ({ code: -233, message: 'Oh my message', data: { custom_data: true } }) },
})
await expect(server.throws()).rejects.toThrowErrorMatchingInlineSnapshot(`"Oh my message"`)
await expect(server.throws()).rejects.toThrowErrorMatchingInlineSnapshot('"Oh my message"')
}),
)

Expand Down
2 changes: 1 addition & 1 deletion api/base.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface AsyncCallLogLevel {

// @public
export interface AsyncCallOptions {
channel: CallbackBasedChannel | EventBasedChannel;
channel: CallbackBasedChannel | EventBasedChannel | Promise<CallbackBasedChannel | EventBasedChannel>;
idGenerator?(): string | number;
key?: string;
log?: AsyncCallLogLevel | boolean | 'all';
Expand Down
2 changes: 1 addition & 1 deletion api/full.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface AsyncCallLogLevel {

// @public
export interface AsyncCallOptions {
channel: CallbackBasedChannel | EventBasedChannel;
channel: CallbackBasedChannel | EventBasedChannel | Promise<CallbackBasedChannel | EventBasedChannel>;
idGenerator?(): string | number;
key?: string;
log?: AsyncCallLogLevel | boolean | 'all';
Expand Down
2 changes: 1 addition & 1 deletion docs/async-call-rpc.asynccalloptions.channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The message channel to exchange messages between server and client
**Signature:**

```typescript
channel: CallbackBasedChannel | EventBasedChannel;
channel: CallbackBasedChannel | EventBasedChannel | Promise<CallbackBasedChannel | EventBasedChannel>;
```

## Example
Expand Down
2 changes: 1 addition & 1 deletion docs/async-call-rpc.asynccalloptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface AsyncCallOptions

| Property | Modifiers | Type | Description |
| --- | --- | --- | --- |
| [channel](./async-call-rpc.asynccalloptions.channel.md) | | [CallbackBasedChannel](./async-call-rpc.callbackbasedchannel.md) \| [EventBasedChannel](./async-call-rpc.eventbasedchannel.md) | The message channel to exchange messages between server and client |
| [channel](./async-call-rpc.asynccalloptions.channel.md) | | [CallbackBasedChannel](./async-call-rpc.callbackbasedchannel.md) \| [EventBasedChannel](./async-call-rpc.eventbasedchannel.md) \| Promise&lt;[CallbackBasedChannel](./async-call-rpc.callbackbasedchannel.md) \| [EventBasedChannel](./async-call-rpc.eventbasedchannel.md)<!-- -->&gt; | The message channel to exchange messages between server and client |
| [key?](./async-call-rpc.asynccalloptions.key.md) | | string | _(Optional)_ This option is used for better log print. |
| [log?](./async-call-rpc.asynccalloptions.log.md) | | [AsyncCallLogLevel](./async-call-rpc.asynccallloglevel.md) \| boolean \| 'all' | _(Optional)_ Choose log level. |
| [logger?](./async-call-rpc.asynccalloptions.logger.md) | | [ConsoleInterface](./async-call-rpc.consoleinterface.md) | _(Optional)_ Provide the logger of AsyncCall |
Expand Down
61 changes: 37 additions & 24 deletions src/Async-Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ export function AsyncCall<OtherSideImplementedFunctions = {}>(
options: AsyncCallOptions,
): AsyncVersionOf<OtherSideImplementedFunctions> {
let isThisSideImplementationPending = true
let resolvedThisSideImplementationValue: unknown = undefined
let rejectedThisSideImplementation: unknown = undefined
let resolvedThisSideImplementationValue: unknown
let rejectedThisSideImplementation: unknown

let resolvedChannel: EventBasedChannel | CallbackBasedChannel | undefined
let channelPromise: Promise<EventBasedChannel | CallbackBasedChannel> | undefined
// This promise should never fail
const awaitThisSideImplementation = async () => {
try {
Expand All @@ -73,6 +76,29 @@ export function AsyncCall<OtherSideImplementedFunctions = {}>(
isThisSideImplementationPending = false
}
}
const onChannelResolved = (channel: EventBasedChannel | CallbackBasedChannel) => {
resolvedChannel = channel
if (isCallbackBasedChannel(channel)) {
channel.setup(
(data) => rawMessageReceiver(data).then(rawMessageSender),
(data) => {
const _ = deserialization(data)
if (isJSONRPCObject(_)) return true
return Promise_resolve(_).then(isJSONRPCObject)
},
)
}
if (isEventBasedChannel(channel)) {
const m = channel
m.on &&
m.on((_) =>
rawMessageReceiver(_)
.then(rawMessageSender)
.then((x) => x && m.send!(x)),
)
}
return channel
}

const {
serializer,
Expand Down Expand Up @@ -243,28 +269,10 @@ export function AsyncCall<OtherSideImplementedFunctions = {}>(
}
const serialization = serializer ? (x: unknown) => serializer.serialization(x) : Object
const deserialization = serializer ? (x: unknown) => serializer.deserialization(x) : Object
const isEventBasedChannel = (x: typeof channel): x is EventBasedChannel => 'send' in x && isFunction(x.send)
const isCallbackBasedChannel = (x: typeof channel): x is CallbackBasedChannel => 'setup' in x && isFunction(x.setup)

if (isCallbackBasedChannel(channel)) {
channel.setup(
(data) => rawMessageReceiver(data).then(rawMessageSender),
(data) => {
const _ = deserialization(data)
if (isJSONRPCObject(_)) return true
return Promise_resolve(_).then(isJSONRPCObject)
},
)
}
if (isEventBasedChannel(channel)) {
const m = channel as EventBasedChannel | CallbackBasedChannel
m.on &&
m.on((_) =>
rawMessageReceiver(_)
.then(rawMessageSender)
.then((x) => x && m.send!(x)),
)
}
if (channel instanceof Promise) channelPromise = channel.then(onChannelResolved)
else onChannelResolved(channel)

const makeErrorObject = (e: any, frameworkStack: string, data: Request) => {
if (isObject(e) && 'stack' in e)
e.stack = frameworkStack
Expand All @@ -277,7 +285,7 @@ export function AsyncCall<OtherSideImplementedFunctions = {}>(
const sendPayload = async (payload: unknown, removeQueueR = false) => {
if (removeQueueR) payload = [...(payload as BatchQueue)]
const data = await serialization(payload)
return channel.send!(data)
return (resolvedChannel || (await channelPromise))!.send!(data)
}
const rejectsQueue = (queue: BatchQueue, error: unknown) => {
for (const x of queue) {
Expand Down Expand Up @@ -379,3 +387,8 @@ export function AsyncCall<OtherSideImplementedFunctions = {}>(
}
// Assume a console object in global if there is no custom logger provided
declare const console: ConsoleInterface

const isEventBasedChannel = (x: EventBasedChannel | CallbackBasedChannel): x is EventBasedChannel =>
'send' in x && isFunction(x.send)
const isCallbackBasedChannel = (x: EventBasedChannel | CallbackBasedChannel): x is CallbackBasedChannel =>
'setup' in x && isFunction(x.setup)
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export interface AsyncCallOptions {
* @example
* {@link https://github.com/Jack-Works/async-call-rpc/blob/main/utils-src/web/websocket.client.ts | Example for CallbackBasedChannel} or {@link https://github.com/Jack-Works/async-call-rpc/blob/main/utils-src/node/websocket.server.ts | Example for EventBasedChannel}
*/
channel: CallbackBasedChannel | EventBasedChannel
channel: CallbackBasedChannel | EventBasedChannel | Promise<CallbackBasedChannel | EventBasedChannel>
/**
* Choose log level.
* @remarks
Expand Down

0 comments on commit b9d8410

Please sign in to comment.