From 5981df0f45bbc5550ce218551b9338de57f4f4a3 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sun, 12 May 2024 00:40:48 -0400 Subject: [PATCH] fix parsing continuation frames in websocket --- lib/web/websocket/receiver.js | 68 ++++++++++++++++++++++----- lib/web/websocket/util.js | 7 ++- test/websocket/continuation-frames.js | 39 +++++++++++++++ 3 files changed, 100 insertions(+), 14 deletions(-) create mode 100644 test/websocket/continuation-frames.js diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index a63e7426918..087b962a3ce 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -5,7 +5,7 @@ const assert = require('node:assert') const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants') const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') const { channels } = require('../../core/diagnostics') -const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util') +const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame, isContinuationFrame } = require('./util') const { WebsocketFrameSend } = require('./frame') const { CloseEvent } = require('./events') @@ -80,6 +80,18 @@ class ByteParser extends Writable { payloadLength }) + if (loop) { + continue + } else { + return + } + } else if (isContinuationFrame(opcode)) { + const loop = this.parseContinuationFrame(callback, { + fin, + fragmented, + payloadLength + }) + if (loop) { continue } else { @@ -96,9 +108,6 @@ class ByteParser extends Writable { this.#state = parserStates.PAYLOADLENGTH_64 } - // TODO(@KhafraDev): handle continuation frames separately as their - // semantics are different from TEXT/BINARY frames. - this.#info.originalOpcode ??= opcode this.#info.opcode = opcode this.#info.masked = masked this.#info.fin = fin @@ -146,19 +155,16 @@ class ByteParser extends Writable { // If there is still more data in this chunk that needs to be read return callback() } else if (this.#byteOffset >= this.#info.payloadLength) { - // If the server sent multiple frames in a single chunk - const body = this.consume(this.#info.payloadLength) - this.#fragments.push(body) - // If the frame is unfragmented, or a fragmented frame was terminated, - // a message was received - if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) { + // If the frame is not fragmented, a message has been received. + // If the frame is fragmented, it will terminate with a fin bit set + // and an opcode of 0 (continuation), therefore we handle that when + // parsing continuation frames, not here. + if (!this.#info.fragmented) { const fullMessage = Buffer.concat(this.#fragments) - - websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage) - + websocketMessageReceived(this.ws, this.#info.opcode, fullMessage) this.#info = {} this.#fragments.length = 0 } @@ -265,6 +271,9 @@ class ByteParser extends Writable { // Control frames can have a payload length of 125 bytes MAX callback(new Error('Payload length for control frame exceeded 125 bytes.')) return false + } else if (this.#byteOffset < info.payloadLength) { + callback() + return false } const body = this.consume(info.payloadLength) @@ -357,6 +366,39 @@ class ByteParser extends Writable { return true } + /** + * Parses continuation frames. + * @param {Buffer} data + * @param {(err?: Error) => void} callback + * @param {{ fin: boolean, fragmented: boolean, payloadLength: number }} info + */ + parseContinuationFrame (callback, info) { + // If we received a continuation frame before we started parsing another frame. + if (this.#info.opcode === undefined) { + callback(new Error('Received unexpected continuation frame.')) + return false + } else if (this.#byteOffset < info.payloadLength) { + callback() + return false + } + + const body = this.consume(info.payloadLength) + this.#fragments.push(body) + + // A fragmented message consists of a single frame with the FIN bit + // clear and an opcode other than 0, followed by zero or more frames + // with the FIN bit clear and the opcode set to 0, and terminated by + // a single frame with the FIN bit set and an opcode of 0. + if (info.fin) { + const message = Buffer.concat(this.#fragments) + websocketMessageReceived(this.ws, this.#info.opcode, message) + this.#fragments.length = 0 + this.#info = {} + } + + return true + } + get closingInfo () { return this.#info.closeInfo } diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index 9a984128858..35d67d17eb5 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -222,6 +222,10 @@ function isControlFrame (opcode) { ) } +function isContinuationFrame (opcode) { + return opcode === opcodes.CONTINUATION +} + // https://nodejs.org/api/intl.html#detecting-internationalization-support const hasIntl = typeof process.versions.icu === 'string' const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined @@ -250,5 +254,6 @@ module.exports = { failWebsocketConnection, websocketMessageReceived, utf8Decode, - isControlFrame + isControlFrame, + isContinuationFrame } diff --git a/test/websocket/continuation-frames.js b/test/websocket/continuation-frames.js new file mode 100644 index 00000000000..70f2d52fb9c --- /dev/null +++ b/test/websocket/continuation-frames.js @@ -0,0 +1,39 @@ +'use strict' + +const { test } = require('node:test') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') +const { tspl } = require('@matteo.collina/tspl') + +test('Receiving multiple continuation frames works as expected', async (t) => { + const p = tspl(t, { plan: 1 }) + + const frames = [ + Buffer.from([0x01, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // text frame "hello" (fragmented) + Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear) + Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear) + Buffer.from([0x80, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]) // continuation frame "hello" (fin set) + ] + + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', (ws) => { + const socket = ws._socket + + for (const frame of frames) { + socket.write(frame) + } + }) + + const ws = new WebSocket(`ws://localhost:${server.address().port}`) + + ws.onerror = p.fail + ws.onmessage = (e) => p.deepStrictEqual(e.data, 'hellohellohellohello') + + t.after(() => { + server.close() + ws.close() + }) + + await p.completed +})