Skip to content

Commit

Permalink
fix(middleware-websocket): move event listener functions outside of a…
Browse files Browse the repository at this point in the history
…sync iterator (#4816)
  • Loading branch information
kuhe committed Jun 9, 2023
1 parent bb6e4fc commit 9055419
Showing 1 changed file with 38 additions and 31 deletions.
69 changes: 38 additions & 31 deletions packages/middleware-websocket/src/websocket-fetch-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class WebSocketFetchHandler {
* Removes all closing/closed sockets from the socket pool for URL.
*/
private removeNotUsableSockets(url: string): void {
this.sockets[url] = this.sockets[url].filter(
this.sockets[url] = (this.sockets[url] ?? []).filter(
(socket) => ![WebSocket.CLOSING, WebSocket.CLOSED].includes(socket.readyState)
);
}
Expand Down Expand Up @@ -115,39 +115,46 @@ export class WebSocketFetchHandler {
// is returned while data keeps streaming.
let streamError: Error | undefined = undefined;

// To notify onclose event that error has occurred.
let socketErrorOccurred = false;

// initialize as no-op.
let reject: (err?: unknown) => void = () => {};
let resolve: ({ done, value }: { done: boolean; value: Uint8Array }) => void = () => {};

socket.onmessage = (event) => {
resolve({
done: false,
value: new Uint8Array(event.data),
});
};

socket.onerror = (error) => {
socketErrorOccurred = true;
socket.close();
reject(error);
};

socket.onclose = () => {
this.removeNotUsableSockets(socket.url);
if (socketErrorOccurred) return;

if (streamError) {
reject(streamError);
} else {
resolve({
done: true,
value: undefined as any, // unchecked because done=true.
});
}
};

const outputStream: AsyncIterable<Uint8Array> = {
[Symbol.asyncIterator]: () => ({
next: () => {
return new Promise((resolve, reject) => {
// To notify onclose event that error has occurred
let socketErrorOccurred = false;

socket.onerror = (error) => {
socketErrorOccurred = true;
socket.close();
reject(error);
};

socket.onclose = () => {
this.removeNotUsableSockets(socket.url);
if (socketErrorOccurred) return;

if (streamError) {
reject(streamError);
} else {
resolve({
done: true,
value: undefined,
});
}
};

socket.onmessage = (event) => {
resolve({
done: false,
value: new Uint8Array(event.data),
});
};
return new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
},
}),
Expand Down

0 comments on commit 9055419

Please sign in to comment.