diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index b9fc7a72364..1b1468d4ab9 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -2,15 +2,30 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') +const FixedQueue = require('../../dispatcher/fixed-queue') -/** @type {Uint8Array} */ +/** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] +/** + * @typedef {object} SendQueueNode + * @property {Promise | null} promise + * @property {((...args: any[]) => any)} callback + * @property {Buffer | null} frame + */ + class SendQueue { - #queued = new Set() - #size = 0 + /** + * @type {FixedQueue} + */ + #queue = new FixedQueue() + + /** + * @type {boolean} + */ + #running = false - /** @type {import('net').Socket} */ + /** @type {import('node:net').Socket} */ #socket constructor (socket) { @@ -19,58 +34,62 @@ class SendQueue { add (item, cb, hint) { if (hint !== sendHints.blob) { - const data = clone(item, hint) - - if (this.#size === 0) { - this.#dispatch(data, cb, hint) + const frame = createFrame(item, hint) + if (!this.#running) { + // fast-path + this.#socket.write(frame, cb) } else { - this.#queued.add([data, cb, true, hint]) - this.#size++ - - this.#run() + /** @type {SendQueueNode} */ + const node = { + promise: null, + callback: cb, + frame + } + this.#queue.push(node) } - return } - const promise = item.arrayBuffer() - const queue = [null, cb, false, hint] - promise.then((ab) => { - queue[0] = clone(ab, hint) - queue[2] = true + /** @type {SendQueueNode} */ + const node = { + promise: item.arrayBuffer().then((ab) => { + node.promise = null + node.frame = createFrame(ab, hint) + }), + callback: cb, + frame: null + } - this.#run() - }) + this.#queue.push(node) - this.#queued.add(queue) - this.#size++ + if (!this.#running) { + this.#run() + } } - #run () { - for (const queued of this.#queued) { - const [data, cb, done, hint] = queued - - if (!done) return - - this.#queued.delete(queued) - this.#size-- - - this.#dispatch(data, cb, hint) + async #run () { + this.#running = true + const queue = this.#queue + while (!queue.isEmpty()) { + const node = queue.shift() + // wait pending promise + if (node.promise !== null) { + await node.promise + } + // write + this.#socket.write(node.frame, node.callback) + // cleanup + node.callback = node.frame = null } + this.#running = false } +} - #dispatch (data, cb, hint) { - const frame = new WebsocketFrameSend() - const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY - - frame.frameData = data - const buffer = frame.createFrame(opcode) - - this.#socket.write(buffer, cb) - } +function createFrame (data, hint) { + return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY) } -function clone (data, hint) { +function toBuffer (data, hint) { switch (hint) { case sendHints.string: return Buffer.from(data) @@ -78,7 +97,7 @@ function clone (data, hint) { case sendHints.blob: return new FastBuffer(data) case sendHints.typedArray: - return Buffer.copyBytesFrom(data) + return new FastBuffer(data.buffer, data.byteOffset, data.byteLength) } }