From 2e128c1b588ceff4794b4d0bc5241b7914a9b61e Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Thu, 11 Apr 2024 11:28:12 +0200 Subject: [PATCH] refactor: h2 refactoring (#3082) * refactor: h2 refactoring * test: add test for servername changed * fix: leftover --- lib/dispatcher/client-h2.js | 206 ++++++++++++++++++------------ test/http2.js | 13 +- test/node-test/client-dispatch.js | 95 ++++++++++++++ 3 files changed, 229 insertions(+), 85 deletions(-) diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index d4d234ce895..7c8ec5f0cf9 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -98,28 +98,22 @@ async function connectH2 (client, socket) { util.addListener(session, 'goaway', onHTTP2GoAway) util.addListener(session, 'close', function () { const { [kClient]: client } = this + const { [kSocket]: socket } = client - const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this)) + const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket)) - client[kSocket] = null client[kHTTP2Session] = null - assert(client[kPending] === 0) + if (client.destroyed) { + assert(client[kPending] === 0) - // Fail entire queue. - const requests = client[kQueue].splice(client[kRunningIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - util.errorRequest(client, request, err) + // Fail entire queue. + const requests = client[kQueue].splice(client[kRunningIdx]) + for (let i = 0; i < requests.length; i++) { + const request = requests[i] + util.errorRequest(client, request, err) + } } - - client[kPendingIdx] = client[kRunningIdx] - - assert(client[kRunning] === 0) - - client.emit('disconnect', client[kUrl], [client], err) - - client[kResume]() }) session.unref() @@ -139,6 +133,24 @@ async function connectH2 (client, socket) { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) + util.addListener(socket, 'close', function () { + const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) + + client[kSocket] = null + + if (this[kHTTP2Session] != null) { + this[kHTTP2Session].destroy(err) + } + + client[kPendingIdx] = client[kRunningIdx] + + assert(client[kRunning] === 0) + + client.emit('disconnect', client[kUrl], [client], err) + + client[kResume]() + }) + let closed = false socket.on('close', () => { closed = true @@ -155,10 +167,10 @@ async function connectH2 (client, socket) { }, destroy (err, callback) { - session.destroy(err) if (closed) { queueMicrotask(callback) } else { + // Destroying the socket will trigger the session close socket.destroy(err).on('close', callback) } }, @@ -257,27 +269,28 @@ function writeH2 (client, request) { headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}` headers[HTTP2_HEADER_METHOD] = method - try { - // We are already connected, streams are pending. - // We can call on connect, and wait for abort - request.onConnect((err) => { - if (request.aborted || request.completed) { - return - } + const abort = (err) => { + if (request.aborted || request.completed) { + return + } - err = err || new RequestAbortedError() + err = err || new RequestAbortedError() - if (stream != null) { - util.destroy(stream, err) + util.errorRequest(client, request, err) - session[kOpenStreams] -= 1 - if (session[kOpenStreams] === 0) { - session.unref() - } - } + if (stream != null) { + util.destroy(stream, err) + } - util.errorRequest(client, request, err) - }) + // We do not destroy the socket as we can continue using the session + // the stream get's destroyed and the session remains to create new streams + util.destroy(body, err) + } + + try { + // We are already connected, streams are pending. + // We can call on connect, and wait for abort + request.onConnect(abort) } catch (err) { util.errorRequest(client, request, err) } @@ -302,7 +315,6 @@ function writeH2 (client, request) { stream.once('close', () => { session[kOpenStreams] -= 1 - // TODO(HTTP/2): unref only if current streams count is 0 if (session[kOpenStreams] === 0) session.unref() }) @@ -382,7 +394,7 @@ function writeH2 (client, request) { writeBodyH2() } - // Increment counter as we have new several streams open + // Increment counter as we have new streams open ++session[kOpenStreams] stream.once('response', headers => { @@ -394,7 +406,7 @@ function writeH2 (client, request) { // the request remains in-flight and headers hasn't been received yet // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. - if (request.aborted || request.completed) { + if (request.aborted) { const err = new RequestAbortedError() util.errorRequest(client, request, err) util.destroy(stream, err) @@ -424,14 +436,11 @@ function writeH2 (client, request) { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side - session[kOpenStreams] -= 1 if (session[kOpenStreams] === 0) { session.unref() } - const err = new InformationalError('HTTP/2: stream half-closed (remote)') - util.errorRequest(client, request, err) - util.destroy(stream, err) + abort(new InformationalError('HTTP/2: stream half-closed (remote)')) }) stream.once('close', () => { @@ -442,21 +451,11 @@ function writeH2 (client, request) { }) stream.once('error', function (err) { - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - session[kOpenStreams] -= 1 - util.errorRequest(client, request, err) - util.destroy(stream, err) - } + abort(err) }) stream.once('frameError', (type, code) => { - const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - util.errorRequest(client, request, err) - - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - session[kOpenStreams] -= 1 - util.destroy(stream, err) - } + abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)) }) // stream.on('aborted', () => { @@ -479,37 +478,49 @@ function writeH2 (client, request) { function writeBodyH2 () { /* istanbul ignore else: assertion */ - if (!body) { - request.onRequestSent() + if (!body || contentLength === 0) { + writeBuffer({ + abort, + client, + request, + contentLength, + expectsPayload, + h2stream: stream, + body: null, + socket: client[kSocket] + }) } else if (util.isBuffer(body)) { - assert(contentLength === body.byteLength, 'buffer body must have content length') - stream.cork() - stream.write(body) - stream.uncork() - stream.end() - request.onBodySent(body) - request.onRequestSent() + writeBuffer({ + abort, + client, + request, + contentLength, + body, + expectsPayload, + h2stream: stream, + socket: client[kSocket] + }) } else if (util.isBlobLike(body)) { if (typeof body.stream === 'function') { writeIterable({ + abort, client, request, contentLength, - h2stream: stream, expectsPayload, + h2stream: stream, body: body.stream(), - socket: client[kSocket], - header: '' + socket: client[kSocket] }) } else { writeBlob({ + abort, body, client, request, contentLength, expectsPayload, h2stream: stream, - header: '', socket: client[kSocket] }) } @@ -541,7 +552,30 @@ function writeH2 (client, request) { } } -function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +function writeBuffer ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) { + try { + if (body != null && util.isBuffer(body)) { + assert(contentLength === body.byteLength, 'buffer body must have content length') + h2stream.cork() + h2stream.write(body) + h2stream.uncork() + h2stream.end() + + request.onBodySent(body) + } + + if (!expectsPayload) { + socket[kReset] = true + } + + request.onRequestSent() + client[kResume]() + } catch (error) { + abort(error) + } +} + +function writeStream ({ abort, socket, expectsPayload, h2stream, body, client, request, contentLength }) { assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') // For HTTP/2, is enough to pipe the stream @@ -550,26 +584,29 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, h2stream, (err) => { if (err) { - util.destroy(body, err) - util.destroy(h2stream, err) + util.destroy(pipe, err) + abort(err) } else { + util.removeAllListeners(pipe) request.onRequestSent() + + if (!expectsPayload) { + socket[kReset] = true + } + + client[kResume]() } } ) - pipe.on('data', onPipeData) - pipe.once('end', () => { - pipe.removeListener('data', onPipeData) - util.destroy(pipe) - }) + util.addListener(pipe, 'data', onPipeData) function onPipeData (chunk) { request.onBodySent(chunk) } } -async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeBlob ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) { assert(contentLength === body.size, 'blob body must have content length') try { @@ -582,6 +619,7 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng h2stream.cork() h2stream.write(buffer) h2stream.uncork() + h2stream.end() request.onBodySent(buffer) request.onRequestSent() @@ -592,11 +630,11 @@ async function writeBlob ({ h2stream, body, client, request, socket, contentLeng client[kResume]() } catch (err) { - util.destroy(h2stream) + abort(err) } } -async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeIterable ({ abort, h2stream, body, client, request, socket, contentLength, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') let callback = null @@ -635,11 +673,19 @@ async function writeIterable ({ h2stream, body, client, request, socket, content await waitForDrain() } } + + h2stream.end() + + request.onRequestSent() + + if (!expectsPayload) { + socket[kReset] = true + } + + client[kResume]() } catch (err) { - h2stream.destroy(err) + abort(err) } finally { - request.onRequestSent() - h2stream.end() h2stream .off('close', onDrain) .off('drain', onDrain) diff --git a/test/http2.js b/test/http2.js index 7ff604d91d9..5d5badcc0f2 100644 --- a/test/http2.js +++ b/test/http2.js @@ -819,7 +819,7 @@ test('Should handle h2 request with body (string or buffer) - dispatch', async t stream.end('hello h2!') }) - t = tspl(t, { plan: 7 }) + t = tspl(t, { plan: 9 }) server.listen(0, () => { const client = new Client(`https://localhost:${server.address().port}`, { @@ -1228,6 +1228,8 @@ test( t.strictEqual(response.statusCode, 200) + await response.body.dump() + await t.complete } ) @@ -1329,7 +1331,7 @@ test('#2364 - Concurrent aborts', async t => { const controller = new AbortController() client.request({ - path: '/', + path: '/1', method: 'GET', headers: { 'x-my-header': 'foo' @@ -1343,7 +1345,7 @@ test('#2364 - Concurrent aborts', async t => { }) client.request({ - path: '/', + path: '/2', method: 'GET', headers: { 'x-my-header': 'foo' @@ -1354,7 +1356,7 @@ test('#2364 - Concurrent aborts', async t => { }) client.request({ - path: '/', + path: '/3', method: 'GET', headers: { 'x-my-header': 'foo' @@ -1364,10 +1366,11 @@ test('#2364 - Concurrent aborts', async t => { t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') t.strictEqual(response.headers['x-custom-h2'], 'hello') t.strictEqual(response.statusCode, 200) + response.body.dump() }) client.request({ - path: '/', + path: '/4', method: 'GET', headers: { 'x-my-header': 'foo' diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index 55713d9ad0e..f9ed888d44b 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -1048,3 +1048,98 @@ test('Issue#3065 - fix bad destroy handling', async (t) => { await p.completed }) + +test('Issue#3065 - fix bad destroy handling (h2)', async (t) => { + // Due to we handle the session, the request for h2 will fail on servername change + const p = tspl(t, { plan: 5 }) + const server = createSecureServer(pem) + server.on('stream', (stream) => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + ':status': 200 + }) + stream.end('ended') + }) + + server.listen(0, () => { + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.after(closeClientAndServerAsPromise(client, server)) + + const dispatches = [] + const dispatches2 = [] + + client.once('disconnect', (...args) => { + const [,, err] = args + p.strictEqual(err.code, 'UND_ERR_INFO') + p.strictEqual(err.message, 'servername changed') + }) + + client.dispatch({ + path: '/', + method: 'POST', + body: 'body' + }, { + onConnect () { + dispatches.push('onConnect') + }, + onBodySent () { + dispatches.push('onBodySent') + }, + onResponseStarted () { + dispatches.push('onResponseStarted') + }, + onHeaders () { + dispatches.push('onHeaders1') + }, + onData () { + dispatches.push('onData') + }, + onComplete () { + dispatches.push('onComplete') + p.deepStrictEqual(dispatches, ['onConnect', 'onBodySent', 'onResponseStarted', 'onHeaders1', 'onData', 'onComplete']) + }, + onError (err) { + p.strictEqual(err.code, 'UND_ERR_INFO') + p.strictEqual(err.message, 'servername changed') + } + }) + + client.dispatch({ + servername: 'google.com', + path: '/', + method: 'POST', + body: 'body' + }, { + onConnect () { + dispatches2.push('onConnect') + }, + onBodySent () { + dispatches2.push('onBodySent') + }, + onResponseStarted () { + dispatches2.push('onResponseStarted') + }, + onHeaders () { + dispatches2.push('onHeaders2') + }, + onData () { + dispatches2.push('onData') + }, + onComplete () { + dispatches2.push('onComplete') + p.deepStrictEqual(dispatches2, ['onConnect', 'onBodySent', 'onResponseStarted', 'onHeaders2', 'onData', 'onComplete']) + }, + onError (err) { + p.ifError(err) + } + }) + }) + + await p.completed +})