From c886b343d4b513bed8d4d54b83d60a7606db6c83 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 7 Apr 2024 10:43:09 +0200 Subject: [PATCH] fix: request abort (#3056) --- lib/dispatcher/client-h1.js | 97 ++++++++++++++++++++++--------------- lib/dispatcher/client-h2.js | 1 - 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 62a3e29ef24..5341f7de670 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -915,22 +915,24 @@ function writeH1 (client, request) { const socket = client[kSocket] - try { - request.onConnect((err) => { - if (request.aborted || request.completed) { - return - } + const abort = (err) => { + if (request.aborted || request.completed) { + return + } - errorRequest(client, request, err || new RequestAbortedError()) + errorRequest(client, request, err || new RequestAbortedError()) - util.destroy(socket, new InformationalError('aborted')) - }) + util.destroy(body) + util.destroy(socket, new InformationalError('aborted')) + } + + try { + request.onConnect(abort) } catch (err) { errorRequest(client, request, err) } if (request.aborted) { - util.destroy(body) return false } @@ -998,35 +1000,19 @@ function writeH1 (client, request) { /* istanbul ignore else: assertion */ if (!body || bodyLength === 0) { - if (contentLength === 0) { - socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') - } else { - assert(contentLength === null, 'no body must not have content length') - socket.write(`${header}\r\n`, 'latin1') - } - request.onRequestSent() + writeBuffer({ abort, body: null, client, request, socket, contentLength, header, expectsPayload }) } else if (util.isBuffer(body)) { - assert(contentLength === body.byteLength, 'buffer body must have content length') - - socket.cork() - socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') - socket.write(body) - socket.uncork() - request.onBodySent(body) - request.onRequestSent() - if (!expectsPayload) { - socket[kReset] = true - } + writeBuffer({ abort, body, client, request, socket, contentLength, header, expectsPayload }) } else if (util.isBlobLike(body)) { if (typeof body.stream === 'function') { - writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload }) + writeIterable({ abort, body: body.stream(), client, request, socket, contentLength, header, expectsPayload }) } else { - writeBlob({ body, client, request, socket, contentLength, header, expectsPayload }) + writeBlob({ abort, body, client, request, socket, contentLength, header, expectsPayload }) } } else if (util.isStream(body)) { - writeStream({ body, client, request, socket, contentLength, header, expectsPayload }) + writeStream({ abort, body, client, request, socket, contentLength, header, expectsPayload }) } else if (util.isIterable(body)) { - writeIterable({ body, client, request, socket, contentLength, header, expectsPayload }) + writeIterable({ abort, body, client, request, socket, contentLength, header, expectsPayload }) } else { assert(false) } @@ -1034,12 +1020,12 @@ function writeH1 (client, request) { return true } -function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +function writeStream ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') let finished = false - const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) + const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header }) const onData = function (chunk) { if (finished) { @@ -1137,7 +1123,37 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, } } -async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeBuffer ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) { + try { + if (!body) { + if (contentLength === 0) { + socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') + } else { + assert(contentLength === null, 'no body must not have content length') + socket.write(`${header}\r\n`, 'latin1') + } + } else if (util.isBuffer(body)) { + assert(contentLength === body.byteLength, 'buffer body must have content length') + + socket.cork() + socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') + socket.write(body) + socket.uncork() + request.onBodySent(body) + + if (!expectsPayload) { + socket[kReset] = true + } + } + request.onRequestSent() + + client[kResume]() + } catch (err) { + abort(err) + } +} + +async function writeBlob ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength === body.size, 'blob body must have content length') try { @@ -1161,11 +1177,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng client[kResume]() } catch (err) { - util.destroy(socket, err) + abort(err) } } -async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeIterable ({ abort, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') let callback = null @@ -1191,7 +1207,7 @@ async function writeIterable ({ h2stream, body, client, request, socket, content .on('close', onDrain) .on('drain', onDrain) - const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) + const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header }) try { // It's up to the user to somehow abort the async iterable. for await (const chunk of body) { @@ -1215,7 +1231,7 @@ async function writeIterable ({ h2stream, body, client, request, socket, content } class AsyncWriter { - constructor ({ socket, request, contentLength, client, expectsPayload, header }) { + constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) { this.socket = socket this.request = request this.contentLength = contentLength @@ -1223,6 +1239,7 @@ class AsyncWriter { this.bytesWritten = 0 this.expectsPayload = expectsPayload this.header = header + this.abort = abort socket[kWriting] = true } @@ -1338,13 +1355,13 @@ class AsyncWriter { } destroy (err) { - const { socket, client } = this + const { socket, client, abort } = this socket[kWriting] = false if (err) { assert(client[kRunning] <= 1, 'pipeline should only contain this request') - util.destroy(socket, err) + abort(err) } } } diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 1fedae88715..c0f3d7f5d21 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -246,7 +246,6 @@ function writeH2 (client, request) { if (upgrade) { errorRequest(client, request, new Error('Upgrade not supported for H2')) - return false } if (request.aborted) {