Skip to content

Commit

Permalink
fix parsing continuation frames in websocket (#3247)
Browse files Browse the repository at this point in the history
  • Loading branch information
KhafraDev authored May 12, 2024
1 parent 9302599 commit 38c8b55
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 14 deletions.
68 changes: 55 additions & 13 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const assert = require('node:assert')
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { channels } = require('../../core/diagnostics')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require('./util')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame, isContinuationFrame } = require('./util')
const { WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')

Expand Down Expand Up @@ -80,6 +80,18 @@ class ByteParser extends Writable {
payloadLength
})

if (loop) {
continue
} else {
return
}
} else if (isContinuationFrame(opcode)) {
const loop = this.parseContinuationFrame(callback, {
fin,
fragmented,
payloadLength
})

if (loop) {
continue
} else {
Expand All @@ -96,9 +108,6 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

// TODO(@KhafraDev): handle continuation frames separately as their
// semantics are different from TEXT/BINARY frames.
this.#info.originalOpcode ??= opcode
this.#info.opcode = opcode
this.#info.masked = masked
this.#info.fin = fin
Expand Down Expand Up @@ -146,19 +155,16 @@ class ByteParser extends Writable {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
// If the server sent multiple frames in a single chunk

const body = this.consume(this.#info.payloadLength)

this.#fragments.push(body)

// If the frame is unfragmented, or a fragmented frame was terminated,
// a message was received
if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
// If the frame is not fragmented, a message has been received.
// If the frame is fragmented, it will terminate with a fin bit set
// and an opcode of 0 (continuation), therefore we handle that when
// parsing continuation frames, not here.
if (!this.#info.fragmented) {
const fullMessage = Buffer.concat(this.#fragments)

websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)

websocketMessageReceived(this.ws, this.#info.opcode, fullMessage)
this.#info = {}
this.#fragments.length = 0
}
Expand Down Expand Up @@ -265,6 +271,9 @@ class ByteParser extends Writable {
// Control frames can have a payload length of 125 bytes MAX
callback(new Error('Payload length for control frame exceeded 125 bytes.'))
return false
} else if (this.#byteOffset < info.payloadLength) {
callback()
return false
}

const body = this.consume(info.payloadLength)
Expand Down Expand Up @@ -357,6 +366,39 @@ class ByteParser extends Writable {
return true
}

/**
* Parses continuation frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ fin: boolean, fragmented: boolean, payloadLength: number }} info
*/
parseContinuationFrame (callback, info) {
// If we received a continuation frame before we started parsing another frame.
if (this.#info.opcode === undefined) {
callback(new Error('Received unexpected continuation frame.'))
return false
} else if (this.#byteOffset < info.payloadLength) {
callback()
return false
}

const body = this.consume(info.payloadLength)
this.#fragments.push(body)

// A fragmented message consists of a single frame with the FIN bit
// clear and an opcode other than 0, followed by zero or more frames
// with the FIN bit clear and the opcode set to 0, and terminated by
// a single frame with the FIN bit set and an opcode of 0.
if (info.fin) {
const message = Buffer.concat(this.#fragments)
websocketMessageReceived(this.ws, this.#info.opcode, message)
this.#fragments.length = 0
this.#info = {}
}

return true
}

get closingInfo () {
return this.#info.closeInfo
}
Expand Down
7 changes: 6 additions & 1 deletion lib/web/websocket/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ function isControlFrame (opcode) {
)
}

function isContinuationFrame (opcode) {
return opcode === opcodes.CONTINUATION
}

// https://nodejs.org/api/intl.html#detecting-internationalization-support
const hasIntl = typeof process.versions.icu === 'string'
const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined
Expand Down Expand Up @@ -250,5 +254,6 @@ module.exports = {
failWebsocketConnection,
websocketMessageReceived,
utf8Decode,
isControlFrame
isControlFrame,
isContinuationFrame
}
39 changes: 39 additions & 0 deletions test/websocket/continuation-frames.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

const { test } = require('node:test')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const { tspl } = require('@matteo.collina/tspl')

test('Receiving multiple continuation frames works as expected', async (t) => {
const p = tspl(t, { plan: 1 })

const frames = [
Buffer.from([0x01, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // text frame "hello" (fragmented)
Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear)
Buffer.from([0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]), // continuation frame "hello" (fin clear)
Buffer.from([0x80, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]) // continuation frame "hello" (fin set)
]

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
const socket = ws._socket

for (const frame of frames) {
socket.write(frame)
}
})

const ws = new WebSocket(`ws://localhost:${server.address().port}`)

ws.onerror = p.fail
ws.onmessage = (e) => p.deepStrictEqual(e.data, 'hellohellohellohello')

t.after(() => {
server.close()
ws.close()
})

await p.completed
})

0 comments on commit 38c8b55

Please sign in to comment.