Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup websocket #3257

Merged
merged 4 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 57 additions & 132 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ const {
websocketMessageReceived,
utf8Decode,
isControlFrame,
isContinuationFrame,
isTextBinaryFrame
isTextBinaryFrame,
isContinuationFrame
} = require('./util')
const { WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')
const { closeWebSocketConnection } = require('./connection')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
Expand All @@ -26,6 +26,7 @@ const { CloseEvent } = require('./events')
class ByteParser extends Writable {
#buffers = []
#byteOffset = 0
#loop = false

#state = parserStates.INFO

Expand All @@ -45,6 +46,7 @@ class ByteParser extends Writable {
_write (chunk, _, callback) {
this.#buffers.push(chunk)
this.#byteOffset += chunk.length
this.#loop = true

this.run(callback)
}
Expand All @@ -55,7 +57,7 @@ class ByteParser extends Writable {
* or not enough bytes are buffered to parse.
*/
run (callback) {
while (true) {
while (this.#loop) {
if (this.#state === parserStates.INFO) {
// If there aren't enough bytes to parse the payload length, etc.
if (this.#byteOffset < 2) {
Expand All @@ -67,6 +69,13 @@ class ByteParser extends Writable {
const opcode = buffer[0] & 0x0F
const masked = (buffer[1] & 0x80) === 0x80

const fragmented = !fin && opcode !== opcodes.CONTINUATION
const payloadLength = buffer[1] & 0x7F

const rsv1 = buffer[0] & 0x40
const rsv2 = buffer[0] & 0x20
const rsv3 = buffer[0] & 0x10

if (!isValidOpcode(opcode)) {
failWebsocketConnection(this.ws, 'Invalid opcode received')
return callback()
Expand All @@ -77,22 +86,16 @@ class ByteParser extends Writable {
return callback()
}

const rsv1 = (buffer[0] & 0x40) !== 0
const rsv2 = (buffer[0] & 0x20) !== 0
const rsv3 = (buffer[0] & 0x10) !== 0

// MUST be 0 unless an extension is negotiated that defines meanings
// for non-zero values. If a nonzero value is received and none of
// the negotiated extensions defines the meaning of such a nonzero
// value, the receiving endpoint MUST _Fail the WebSocket
// Connection_.
if (rsv1 || rsv2 || rsv3) {
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) {
failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear')
return
}

const fragmented = !fin && opcode !== opcodes.CONTINUATION

if (fragmented && !isTextBinaryFrame(opcode)) {
// Only text and binary frames can be fragmented
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
Expand All @@ -101,39 +104,27 @@ class ByteParser extends Writable {

// If we are already parsing a text/binary frame and do not receive either
// a continuation frame or close frame, fail the connection.
if (isTextBinaryFrame(opcode) && this.#info.opcode !== undefined) {
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
failWebsocketConnection(this.ws, 'Expected continuation frame')
return
}

const payloadLength = buffer[1] & 0x7F

if (isControlFrame(opcode)) {
const loop = this.parseControlFrame(callback, {
header: buffer,
opcode,
fragmented,
payloadLength
})
if (this.#info.fragmented && fragmented) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
return
}

if (loop) {
continue
} else {
return
}
} else if (isContinuationFrame(opcode)) {
const loop = this.parseContinuationFrame(callback, {
header: buffer,
fin,
fragmented,
payloadLength
})
// "All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented."
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
failWebsocketConnection(this.ws, 'Control frame either too large or fragmented')
return
}

if (loop) {
continue
} else {
return
}
if (isContinuationFrame(opcode) && this.#fragments.length === 0) {
failWebsocketConnection(this.ws, 'Unexpected continuation frame')
return
}

if (payloadLength <= 125) {
Expand All @@ -145,16 +136,14 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

if (isTextBinaryFrame(opcode)) {
this.#info.binaryType = opcode
}

this.#info.opcode = opcode
this.#info.masked = masked
this.#info.fin = fin
this.#info.fragmented = fragmented

if (this.#info.fragmented && payloadLength > 125) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
return
}
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
return callback()
Expand Down Expand Up @@ -189,42 +178,40 @@ class ByteParser extends Writable {
this.#state = parserStates.READ_DATA
} else if (this.#state === parserStates.READ_DATA) {
if (this.#byteOffset < this.#info.payloadLength) {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
const body = this.consume(this.#info.payloadLength)
}

const body = this.consume(this.#info.payloadLength)

if (isControlFrame(this.#info.opcode)) {
this.#loop = this.parseControlFrame(body)
} else {
this.#fragments.push(body)

// If the frame is not fragmented, a message has been received.
// If the frame is fragmented, it will terminate with a fin bit set
// and an opcode of 0 (continuation), therefore we handle that when
// parsing continuation frames, not here.
if (!this.#info.fragmented) {
if (!this.#info.fragmented && this.#info.fin) {
const fullMessage = Buffer.concat(this.#fragments)
websocketMessageReceived(this.ws, this.#info.opcode, fullMessage)
this.#info = {}
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage)
this.#fragments.length = 0
}

this.#state = parserStates.INFO
}
}

if (this.#byteOffset === 0 && this.#info.payloadLength !== 0) {
callback()
break
this.#state = parserStates.INFO
}
}
}

/**
* Take n bytes from the buffered Buffers
* @param {number} n
* @returns {Buffer|null}
* @returns {Buffer}
*/
consume (n) {
if (n > this.#byteOffset) {
return null
throw new Error('Called consume() before buffers satiated.')
} else if (n === 0) {
return emptyBuffer
}
Expand Down Expand Up @@ -297,40 +284,25 @@ class ByteParser extends Writable {

/**
* Parses control frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ opcode: number, fragmented: boolean, payloadLength: number, header: Buffer }} info
* @param {Buffer} body
*/
parseControlFrame (callback, info) {
assert(!info.fragmented)

if (info.payloadLength > 125) {
// Control frames can have a payload length of 125 bytes MAX
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
return false
} else if (this.#byteOffset < info.payloadLength) {
this.#buffers.unshift(info.header)
this.#byteOffset += 2

callback()
return false
}

const body = this.consume(info.payloadLength)
parseControlFrame (body) {
const { opcode, payloadLength } = this.#info

if (info.opcode === opcodes.CLOSE) {
if (info.payloadLength === 1) {
if (opcode === opcodes.CLOSE) {
if (payloadLength === 1) {
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
return
return false
}

this.#info.closeInfo = this.parseCloseBody(body)

if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

callback(new CloseEvent('close', { wasClean: false, reason, code }))
return
closeWebSocketConnection(this.ws, code, reason, reason.length)
failWebsocketConnection(this.ws, reason)
return false
}

if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
Expand Down Expand Up @@ -362,9 +334,8 @@ class ByteParser extends Writable {
this.ws[kReceivedClose] = true

this.end()

return
} else if (info.opcode === opcodes.PING) {
return false
} else if (opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
Expand All @@ -381,12 +352,7 @@ class ByteParser extends Writable {
})
}
}

if (this.#byteOffset <= 0) {
callback()
return false
}
} else if (info.opcode === opcodes.PONG) {
} else if (opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.
Expand All @@ -396,47 +362,6 @@ class ByteParser extends Writable {
payload: body
})
}

if (this.#byteOffset <= 0) {
callback()
return false
}
}

return true
}

/**
* Parses continuation frames.
* @param {Buffer} data
* @param {(err?: Error) => void} callback
* @param {{ fin: boolean, fragmented: boolean, payloadLength: number, header: Buffer }} info
*/
parseContinuationFrame (callback, info) {
// If we received a continuation frame before we started parsing another frame.
if (this.#info.opcode === undefined) {
failWebsocketConnection(this.ws, 'Received unexpected continuation frame.')
return false
} else if (this.#byteOffset < info.payloadLength) {
this.#buffers.unshift(info.header)
this.#byteOffset += 2

callback()
return false
}

const body = this.consume(info.payloadLength)
this.#fragments.push(body)

// A fragmented message consists of a single frame with the FIN bit
// clear and an opcode other than 0, followed by zero or more frames
// with the FIN bit clear and the opcode set to 0, and terminated by
// a single frame with the FIN bit set and an opcode of 0.
if (info.fin) {
const message = Buffer.concat(this.#fragments)
websocketMessageReceived(this.ws, this.#info.opcode, message)
this.#fragments.length = 0
this.#info = {}
}

return true
Expand Down
Loading