From ec0b6c4a53438736c2d83ed1b7d975d48fd10b55 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Tue, 21 May 2024 18:02:32 +0900 Subject: [PATCH 1/8] websocket: use linkedlist instead of Set --- lib/web/websocket/sender.js | 126 +++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 44 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index b9fc7a72364..9dd8ee50569 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -3,14 +3,33 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') -/** @type {Uint8Array} */ +/** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] -class SendQueue { - #queued = new Set() - #size = 0 +/** + * @typedef {object} SendQueueNode + * @property {SendQueueNode | null} next + * @property {Promise | null} promise + * @property {((...args: any[]) => any)} callback + * @property {Buffer | null} frame + */ - /** @type {import('net').Socket} */ +class SendQueue { + /** + * @type {SendQueueNode | null} + */ + #head = null + /** + * @type {SendQueueNode | null} + */ + #tail = null + + /** + * @type {boolean} + */ + #running = false + + /** @type {import('node:net').Socket} */ #socket constructor (socket) { @@ -19,58 +38,77 @@ class SendQueue { add (item, cb, hint) { if (hint !== sendHints.blob) { - const data = clone(item, hint) - - if (this.#size === 0) { - this.#dispatch(data, cb, hint) + const frame = createFrame(item, hint) + if (!this.#running) { + // fast-path + this.#socket.write(frame, cb) } else { - this.#queued.add([data, cb, true, hint]) - this.#size++ - - this.#run() + /** @type {SendQueueNode} */ + const node = { + next: null, + promise: null, + callback: cb, + frame + } + if (this.#tail !== null) { + this.#tail.next = node + } + this.#tail = node } - return } - const promise = item.arrayBuffer() - const queue = [null, cb, false, hint] - promise.then((ab) => { - queue[0] = clone(ab, hint) - queue[2] = true - - this.#run() - }) - - this.#queued.add(queue) - this.#size++ - } - - #run () { - for (const queued of this.#queued) { - const [data, cb, done, hint] = queued + /** @type {SendQueueNode} */ + const node = { + next: null, + promise: item.arrayBuffer().then((ab) => { + node.promise = null + node.frame = createFrame(ab, hint) + }), + callback: cb, + frame: null + } - if (!done) return + if (this.#tail === null) { + this.#tail = node + } - this.#queued.delete(queued) - this.#size-- + if (this.#head === null) { + this.#head = node + } - this.#dispatch(data, cb, hint) + if (!this.#running) { + this.#run() } } - #dispatch (data, cb, hint) { - const frame = new WebsocketFrameSend() - const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY - - frame.frameData = data - const buffer = frame.createFrame(opcode) - - this.#socket.write(buffer, cb) + async #run () { + this.#running = true + /** @type {SendQueueNode | null} */ + let node = this.#head + while (node !== null) { + // wait pending promise + if (node.promise !== null) { + await node.promise + } + // write + this.#socket.write(node.frame, node.callback) + // cleanup + node.callback = node.frame = null + // set next + node = node.next + } + this.#head = null + this.#tail = null + this.#running = false } } -function clone (data, hint) { +function createFrame (data, hint) { + return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY) +} + +function toBuffer (data, hint) { switch (hint) { case sendHints.string: return Buffer.from(data) @@ -78,7 +116,7 @@ function clone (data, hint) { case sendHints.blob: return new FastBuffer(data) case sendHints.typedArray: - return Buffer.copyBytesFrom(data) + return new FastBuffer(data.buffer, data.byteOffset, data.byteLength) } } From 826c84c4cf6ec6e1521f1d73154f1cc635628c3c Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Tue, 21 May 2024 22:59:26 +0900 Subject: [PATCH 2/8] use FixedQueue --- lib/web/websocket/sender.js | 42 +++++++++++-------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 9dd8ee50569..7fd11a623f3 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -2,13 +2,13 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') +const FixedQueue = require('../../dispatcher/fixed-queue') /** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] /** - * @typedef {object} SendQueueNode - * @property {SendQueueNode | null} next + * @typedef {object} QueueNode * @property {Promise | null} promise * @property {((...args: any[]) => any)} callback * @property {Buffer | null} frame @@ -16,13 +16,9 @@ const FastBuffer = Buffer[Symbol.species] class SendQueue { /** - * @type {SendQueueNode | null} + * @type {FixedQueue | null} */ - #head = null - /** - * @type {SendQueueNode | null} - */ - #tail = null + #queue = null /** * @type {boolean} @@ -43,24 +39,19 @@ class SendQueue { // fast-path this.#socket.write(frame, cb) } else { - /** @type {SendQueueNode} */ + /** @type {QueueNode} */ const node = { - next: null, promise: null, callback: cb, frame } - if (this.#tail !== null) { - this.#tail.next = node - } - this.#tail = node + (this.#queue ??= new FixedQueue()).push(node) } return } - /** @type {SendQueueNode} */ + /** @type {QueueNode} */ const node = { - next: null, promise: item.arrayBuffer().then((ab) => { node.promise = null node.frame = createFrame(ab, hint) @@ -69,13 +60,7 @@ class SendQueue { frame: null } - if (this.#tail === null) { - this.#tail = node - } - - if (this.#head === null) { - this.#head = node - } + (this.#queue ??= new FixedQueue()).push(node) if (!this.#running) { this.#run() @@ -84,9 +69,10 @@ class SendQueue { async #run () { this.#running = true - /** @type {SendQueueNode | null} */ - let node = this.#head - while (node !== null) { + /** @type {FixedQueue} */ + const queue = this.#queue + while (!queue.isEmpty()) { + const node = queue.shift() // wait pending promise if (node.promise !== null) { await node.promise @@ -95,11 +81,7 @@ class SendQueue { this.#socket.write(node.frame, node.callback) // cleanup node.callback = node.frame = null - // set next - node = node.next } - this.#head = null - this.#tail = null this.#running = false } } From c05988cb040f3ceedbe544b50d3496ea3a43e561 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Tue, 21 May 2024 23:27:38 +0900 Subject: [PATCH 3/8] Revert "use FixedQueue" This reverts commit 0e415b29ce4e9382effc8ae9d768de4b0a9e5a55. --- lib/web/websocket/sender.js | 42 ++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 7fd11a623f3..9dd8ee50569 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -2,13 +2,13 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') -const FixedQueue = require('../../dispatcher/fixed-queue') /** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] /** - * @typedef {object} QueueNode + * @typedef {object} SendQueueNode + * @property {SendQueueNode | null} next * @property {Promise | null} promise * @property {((...args: any[]) => any)} callback * @property {Buffer | null} frame @@ -16,9 +16,13 @@ const FastBuffer = Buffer[Symbol.species] class SendQueue { /** - * @type {FixedQueue | null} + * @type {SendQueueNode | null} */ - #queue = null + #head = null + /** + * @type {SendQueueNode | null} + */ + #tail = null /** * @type {boolean} @@ -39,19 +43,24 @@ class SendQueue { // fast-path this.#socket.write(frame, cb) } else { - /** @type {QueueNode} */ + /** @type {SendQueueNode} */ const node = { + next: null, promise: null, callback: cb, frame } - (this.#queue ??= new FixedQueue()).push(node) + if (this.#tail !== null) { + this.#tail.next = node + } + this.#tail = node } return } - /** @type {QueueNode} */ + /** @type {SendQueueNode} */ const node = { + next: null, promise: item.arrayBuffer().then((ab) => { node.promise = null node.frame = createFrame(ab, hint) @@ -60,7 +69,13 @@ class SendQueue { frame: null } - (this.#queue ??= new FixedQueue()).push(node) + if (this.#tail === null) { + this.#tail = node + } + + if (this.#head === null) { + this.#head = node + } if (!this.#running) { this.#run() @@ -69,10 +84,9 @@ class SendQueue { async #run () { this.#running = true - /** @type {FixedQueue} */ - const queue = this.#queue - while (!queue.isEmpty()) { - const node = queue.shift() + /** @type {SendQueueNode | null} */ + let node = this.#head + while (node !== null) { // wait pending promise if (node.promise !== null) { await node.promise @@ -81,7 +95,11 @@ class SendQueue { this.#socket.write(node.frame, node.callback) // cleanup node.callback = node.frame = null + // set next + node = node.next } + this.#head = null + this.#tail = null this.#running = false } } From aacd26c4e87770c627bfb32d2a65728fe1762b31 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 22 May 2024 06:28:22 +0900 Subject: [PATCH 4/8] fixup --- lib/web/websocket/sender.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 9dd8ee50569..9921911223d 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -69,14 +69,16 @@ class SendQueue { frame: null } - if (this.#tail === null) { - this.#tail = node - } - if (this.#head === null) { this.#head = node } + if (this.#tail === null) { + this.#tail = node + } else { + this.#tail.next = node + } + if (!this.#running) { this.#run() } From a88052bc0b2fcb69a1240dcc812917059113c7d5 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 22 May 2024 06:34:01 +0900 Subject: [PATCH 5/8] use FixedQueue --- lib/web/websocket/sender.js | 39 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 9921911223d..86f53ec1df1 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -2,13 +2,13 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') +const FixedQueue = require('../../dispatcher/fixed-queue') /** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] /** * @typedef {object} SendQueueNode - * @property {SendQueueNode | null} next * @property {Promise | null} promise * @property {((...args: any[]) => any)} callback * @property {Buffer | null} frame @@ -16,13 +16,9 @@ const FastBuffer = Buffer[Symbol.species] class SendQueue { /** - * @type {SendQueueNode | null} + * @type {FixedQueue | null} */ - #head = null - /** - * @type {SendQueueNode | null} - */ - #tail = null + #queue = null /** * @type {boolean} @@ -45,22 +41,20 @@ class SendQueue { } else { /** @type {SendQueueNode} */ const node = { - next: null, promise: null, callback: cb, frame } - if (this.#tail !== null) { - this.#tail.next = node + if (this.#queue === null) { + this.#queue = new FixedQueue() } - this.#tail = node + this.#queue.push(node) } return } /** @type {SendQueueNode} */ const node = { - next: null, promise: item.arrayBuffer().then((ab) => { node.promise = null node.frame = createFrame(ab, hint) @@ -69,15 +63,11 @@ class SendQueue { frame: null } - if (this.#head === null) { - this.#head = node + if (this.#queue === null) { + this.#queue = new FixedQueue() } - if (this.#tail === null) { - this.#tail = node - } else { - this.#tail.next = node - } + this.#queue.push(node) if (!this.#running) { this.#run() @@ -86,9 +76,10 @@ class SendQueue { async #run () { this.#running = true - /** @type {SendQueueNode | null} */ - let node = this.#head - while (node !== null) { + /** @type {FixedQueue} */ + const queue = this.#queue + while (!queue.isEmpty()) { + const node = queue.shift() // wait pending promise if (node.promise !== null) { await node.promise @@ -97,11 +88,7 @@ class SendQueue { this.#socket.write(node.frame, node.callback) // cleanup node.callback = node.frame = null - // set next - node = node.next } - this.#head = null - this.#tail = null this.#running = false } } From a23d5564844c425054b80c593d59637733aa2222 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 22 May 2024 06:50:52 +0900 Subject: [PATCH 6/8] From c7aca212939da4f5f71403af0b28dda5a7deed5c Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 22 May 2024 07:25:44 +0900 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Khafra --- lib/web/websocket/sender.js | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 86f53ec1df1..8a950451eca 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -16,9 +16,9 @@ const FastBuffer = Buffer[Symbol.species] class SendQueue { /** - * @type {FixedQueue | null} + * @type {FixedQueue} */ - #queue = null + #queue = new FixedQueue() /** * @type {boolean} @@ -45,9 +45,6 @@ class SendQueue { callback: cb, frame } - if (this.#queue === null) { - this.#queue = new FixedQueue() - } this.#queue.push(node) } return @@ -63,9 +60,6 @@ class SendQueue { frame: null } - if (this.#queue === null) { - this.#queue = new FixedQueue() - } this.#queue.push(node) @@ -76,7 +70,6 @@ class SendQueue { async #run () { this.#running = true - /** @type {FixedQueue} */ const queue = this.#queue while (!queue.isEmpty()) { const node = queue.shift() From a2d125d86a10130169b6c007cee20eeb771e2152 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 22 May 2024 07:27:40 +0900 Subject: [PATCH 8/8] Apply suggestions from code review --- lib/web/websocket/sender.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 8a950451eca..1b1468d4ab9 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -60,7 +60,6 @@ class SendQueue { frame: null } - this.#queue.push(node) if (!this.#running) {