Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix parsing continuation frames in websocket #3247

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const assert = require('node:assert')
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { channels } = require('../../core/diagnostics')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame, isContinuationFrame } = require('./util')
const { WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')

Expand Down Expand Up @@ -80,6 +80,18 @@ class ByteParser extends Writable {
payloadLength
})

if (loop) {
continue
} else {
return
}
} else if (isContinuationFrame(opcode)) {
const loop = this.parseContinuationFrame(callback, {
fin,
fragmented,
payloadLength
})

if (loop) {
continue
} else {
Expand All @@ -96,9 +108,6 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

// TODO(@KhafraDev): handle continuation frames separately as their
// semantics are different from TEXT/BINARY frames.
this.#info.originalOpcode ??= opcode
this.#info.opcode = opcode
this.#info.masked = masked
this.#info.fin = fin
Expand Down Expand Up @@ -146,19 +155,16 @@ class ByteParser extends Writable {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
// If the server sent multiple frames in a single chunk

const body = this.consume(this.#info.payloadLength)

this.#fragments.push(body)

// If the frame is unfragmented, or a fragmented frame was terminated,
// a message was received
if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
// If the frame is not fragmented, a message has been received.
// If the frame is fragmented, it will terminate with a fin bit set
// and an opcode of 0 (continuation), therefore we handle that when
// parsing continuation frames, not here.
if (!this.#info.fragmented) {
const fullMessage = Buffer.concat(this.#fragments)

websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)

websocketMessageReceived(this.ws, this.#info.opcode, fullMessage)
this.#info = {}
this.#fragments.length = 0
}
Expand Down Expand Up @@ -265,6 +271,9 @@ class ByteParser extends Writable {
// Control frames can have a payload length of 125 bytes MAX
callback(new Error('Payload length for control frame exceeded 125 bytes.'))
return false
} else if (this.#byteOffset < info.payloadLength) {
callback()
return false
}

const body = this.consume(info.payloadLength)
Expand Down Expand Up @@ -357,6 +366,39 @@ class ByteParser extends Writable {
return true
}

/**
* Parses continuation frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ fin: boolean, fragmented: boolean, payloadLength: number }} info
*/
parseContinuationFrame (callback, info) {
// If we received a continuation frame before we started parsing another frame.
if (this.#info.opcode === undefined) {
callback(new Error('Received unexpected continuation frame.'))
return false
} else if (this.#byteOffset < info.payloadLength) {
callback()
return false
}

const body = this.consume(info.payloadLength)
this.#fragments.push(body)

// A fragmented message consists of a single frame with the FIN bit
// clear and an opcode other than 0, followed by zero or more frames
// with the FIN bit clear and the opcode set to 0, and terminated by
// a single frame with the FIN bit set and an opcode of 0.
if (info.fin) {
const message = Buffer.concat(this.#fragments)
websocketMessageReceived(this.ws, this.#info.opcode, message)
this.#fragments.length = 0
this.#info = {}
}

return true
}

get closingInfo () {
return this.#info.closeInfo
}
Expand Down
7 changes: 6 additions & 1 deletion lib/web/websocket/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ function isControlFrame (opcode) {
)
}

function isContinuationFrame (opcode) {
return opcode === opcodes.CONTINUATION
}

// https://nodejs.org/api/intl.html#detecting-internationalization-support
const hasIntl = typeof process.versions.icu === 'string'
const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined
Expand Down Expand Up @@ -250,5 +254,6 @@ module.exports = {
failWebsocketConnection,
websocketMessageReceived,
utf8Decode,
isControlFrame
isControlFrame,
isContinuationFrame
}
39 changes: 39 additions & 0 deletions test/websocket/continuation-frames.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

const { test } = require('node:test')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const { tspl } = require('@matteo.collina/tspl')

test('Receiving multiple continuation frames works as expected', async (t) => {
const p = tspl(t, { plan: 1 })

const frames = [
Buffer.from([0x01, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // text frame "hello" (fragmented)
Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear)
Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear)
Buffer.from([0x80, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]) // continuation frame "hello" (fin set)
]

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
const socket = ws._socket

for (const frame of frames) {
socket.write(frame)
}
})

const ws = new WebSocket(`ws://localhost:${server.address().port}`)

ws.onerror = p.fail
ws.onmessage = (e) => p.deepStrictEqual(e.data, 'hellohellohellohello')

t.after(() => {
server.close()
ws.close()
})

await p.completed
})
Loading