diff --git a/doc/ws.md b/doc/ws.md index a0d9f88ad..c39ac356c 100644 --- a/doc/ws.md +++ b/doc/ws.md @@ -72,6 +72,10 @@ This class represents a WebSocket server. It extends the `EventEmitter`. ### new WebSocketServer(options[, callback]) - `options` {Object} + - `allowMultipleEventsPerMicrotask` {Boolean} Specifies whether or not to + process more than one of the `'message'`, `'ping'`, and `'pong'` events per + microtask. To improve compatibility with the WHATWG standard, the default + value is `false`. Setting it to `true` improves performance slightly. - `backlog` {Number} The maximum length of the queue of pending connections. - `clientTracking` {Boolean} Specifies whether or not to track clients. - `handleProtocols` {Function} A function which can be used to handle the @@ -292,6 +296,10 @@ This class represents a WebSocket. It extends the `EventEmitter`. - `address` {String|url.URL} The URL to which to connect. - `protocols` {String|Array} The list of subprotocols. - `options` {Object} + - `allowMultipleEventsPerMicrotask` {Boolean} Specifies whether or not to + process more than one of the `'message'`, `'ping'`, and `'pong'` events per + microtask. To improve compatibility with the WHATWG standard, the default + value is `false`. Setting it to `true` improves performance slightly. - `finishRequest` {Function} A function which can be used to customize the headers of each http request before it is sent. See description below. - `followRedirects` {Boolean} Whether or not to follow redirects. Defaults to diff --git a/lib/receiver.js b/lib/receiver.js index 1d425ead0..d0c68432d 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -39,6 +39,9 @@ class Receiver extends Writable { * Creates a Receiver instance. * * @param {Object} [options] Options object + * @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies + * whether or not to process more than one of the `'message'`, `'ping'`, + * and `'pong'` events per microtask * @param {String} [options.binaryType=nodebuffer] The type for binary data * @param {Object} [options.extensions] An object containing the negotiated * extensions @@ -51,6 +54,8 @@ class Receiver extends Writable { constructor(options = {}) { super(); + this._allowMultipleEventsPerMicrotask = + !!options.allowMultipleEventsPerMicrotask; this._binaryType = options.binaryType || BINARY_TYPES[0]; this._extensions = options.extensions || {}; this._isServer = !!options.isServer; @@ -561,7 +566,9 @@ class Receiver extends Writable { } } - this._state = WAIT_MICROTASK; + this._state = this._allowMultipleEventsPerMicrotask + ? GET_INFO + : WAIT_MICROTASK; } /** @@ -578,8 +585,6 @@ class Receiver extends Writable { if (data.length === 0) { this.emit('conclude', 1005, EMPTY_BUFFER); this.end(); - - this._state = GET_INFO; } else { const code = data.readUInt16BE(0); @@ -611,16 +616,16 @@ class Receiver extends Writable { this.emit('conclude', code, buf); this.end(); - - this._state = GET_INFO; } - } else if (this._opcode === 0x09) { - this.emit('ping', data); - this._state = WAIT_MICROTASK; - } else { - this.emit('pong', data); - this._state = WAIT_MICROTASK; + + this._state = GET_INFO; + return; } + + this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data); + this._state = this._allowMultipleEventsPerMicrotask + ? GET_INFO + : WAIT_MICROTASK; } } diff --git a/lib/websocket-server.js b/lib/websocket-server.js index b0ed7bd2e..78c0bb289 100644 --- a/lib/websocket-server.js +++ b/lib/websocket-server.js @@ -29,6 +29,9 @@ class WebSocketServer extends EventEmitter { * Create a `WebSocketServer` instance. * * @param {Object} options Configuration options + * @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies + * whether or not to process more than one of the `'message'`, `'ping'`, + * and `'pong'` events per microtask * @param {Number} [options.backlog=511] The maximum length of the queue of * pending connections * @param {Boolean} [options.clientTracking=true] Specifies whether or not to @@ -55,6 +58,7 @@ class WebSocketServer extends EventEmitter { super(); options = { + allowMultipleEventsPerMicrotask: false, maxPayload: 100 * 1024 * 1024, skipUTF8Validation: false, perMessageDeflate: false, @@ -409,6 +413,8 @@ class WebSocketServer extends EventEmitter { socket.removeListener('error', socketOnError); ws.setSocket(socket, head, { + allowMultipleEventsPerMicrotask: + this.options.allowMultipleEventsPerMicrotask, maxPayload: this.options.maxPayload, skipUTF8Validation: this.options.skipUTF8Validation }); diff --git a/lib/websocket.js b/lib/websocket.js index 312f6a237..d2c6a36fe 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -192,6 +192,9 @@ class WebSocket extends EventEmitter { * @param {Duplex} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Object} options Options object + * @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies + * whether or not to process more than one of the `'message'`, `'ping'`, + * and `'pong'` events per microtask * @param {Function} [options.generateMask] The function used to generate the * masking key * @param {Number} [options.maxPayload=0] The maximum allowed message size @@ -201,6 +204,7 @@ class WebSocket extends EventEmitter { */ setSocket(socket, head, options) { const receiver = new Receiver({ + allowMultipleEventsPerMicrotask: options.allowMultipleEventsPerMicrotask, binaryType: this.binaryType, extensions: this._extensions, isServer: this._isServer, @@ -618,6 +622,9 @@ module.exports = WebSocket; * @param {(String|URL)} address The URL to which to connect * @param {Array} protocols The subprotocols * @param {Object} [options] Connection options + * @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies + * whether or not to process more than one of the `'message'`, `'ping'`, + * and `'pong'` events per microtask * @param {Function} [options.finishRequest] A function which can be used to * customize the headers of each http request before it is sent * @param {Boolean} [options.followRedirects=false] Whether or not to follow @@ -642,6 +649,7 @@ module.exports = WebSocket; */ function initAsClient(websocket, address, protocols, options) { const opts = { + allowMultipleEventsPerMicrotask: false, protocolVersion: protocolVersions[1], maxPayload: 100 * 1024 * 1024, skipUTF8Validation: false, @@ -993,6 +1001,7 @@ function initAsClient(websocket, address, protocols, options) { } websocket.setSocket(socket, head, { + allowMultipleEventsPerMicrotask: opts.allowMultipleEventsPerMicrotask, generateMask: opts.generateMask, maxPayload: opts.maxPayload, skipUTF8Validation: opts.skipUTF8Validation diff --git a/test/receiver.test.js b/test/receiver.test.js index 0f82cf3ea..4e3ee923d 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -1150,4 +1150,41 @@ describe('Receiver', () => { receiver.write(Buffer.from('82008200', 'hex')); }); + + it('honors the `allowMultipleEventsPerMicrotask` option', (done) => { + const actual = []; + const expected = [ + '1', + '2', + '3', + '4', + 'microtask 1', + 'microtask 2', + 'microtask 3', + 'microtask 4' + ]; + + function listener(data) { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`microtask ${message}`); + + if (actual.length === 8) { + assert.deepStrictEqual(actual, expected); + done(); + } + }); + } + + const receiver = new Receiver({ allowMultipleEventsPerMicrotask: true }); + + receiver.on('message', listener); + receiver.on('ping', listener); + receiver.on('pong', listener); + + receiver.write(Buffer.from('8101318901328a0133810134', 'hex')); + }); });