diff --git a/doc/api/stream.md b/doc/api/stream.md index 2ecce235330b44..6a7d3372c80a99 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2575,7 +2575,9 @@ run().catch(console.error); `stream.pipeline()` leaves dangling event listeners on the streams after the `callback` has been invoked. In the case of reuse of streams after -failure, this can cause event listener leaks and swallowed errors. +failure, this can cause event listener leaks and swallowed errors. If the last +stream is readable, dangling event listeners will be removed so that the last +stream can be consumed later. `stream.pipeline()` closes all the streams when an error is raised. The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 7df14057b56d4d..36e1aebdd10e10 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -31,6 +31,7 @@ const { const { isIterable, + isReadable, isReadableNodeStream, isNodeStream, } = require('internal/streams/utils'); @@ -45,14 +46,17 @@ function destroyer(stream, reading, writing) { finished = true; }); - eos(stream, { readable: reading, writable: writing }, (err) => { + const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => { finished = !err; }); - return (err) => { - if (finished) return; - finished = true; - destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); + return { + destroy: (err) => { + if (finished) return; + finished = true; + destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); + }, + cleanup }; } @@ -159,6 +163,10 @@ function pipelineImpl(streams, callback, opts) { const signal = ac.signal; const outerSignal = opts?.signal; + // Need to cleanup event listeners if last stream is readable + // https://github.com/nodejs/node/issues/35452 + const lastStreamCleanup = []; + validateAbortSignal(outerSignal, 'options.signal'); function abort() { @@ -194,6 +202,9 @@ function pipelineImpl(streams, callback, opts) { ac.abort(); if (final) { + if (!error) { + lastStreamCleanup.forEach((fn) => fn()); + } process.nextTick(callback, error, value); } } @@ -204,14 +215,20 @@ function pipelineImpl(streams, callback, opts) { const reading = i < streams.length - 1; const writing = i > 0; const end = reading || opts?.end !== false; + const isLastStream = i === streams.length - 1; if (isNodeStream(stream)) { if (end) { - destroys.push(destroyer(stream, reading, writing)); + const { destroy, cleanup } = destroyer(stream, reading, writing); + destroys.push(destroy); + + if (isReadable(stream) && isLastStream) { + lastStreamCleanup.push(cleanup); + } } // Catch stream errors that occur after pipe/pump has completed. - stream.on('error', (err) => { + function onError(err) { if ( err && err.name !== 'AbortError' && @@ -219,7 +236,13 @@ function pipelineImpl(streams, callback, opts) { ) { finish(err); } - }); + } + stream.on('error', onError); + if (isReadable(stream) && isLastStream) { + lastStreamCleanup.push(() => { + stream.removeListener('error', onError); + }); + } } if (i === 0) { @@ -285,12 +308,19 @@ function pipelineImpl(streams, callback, opts) { ret = pt; - destroys.push(destroyer(ret, false, true)); + const { destroy, cleanup } = destroyer(ret, false, true); + destroys.push(destroy); + if (isLastStream) { + lastStreamCleanup.push(cleanup); + } } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - pipe(ret, stream, finish, { end }); + const cleanup = pipe(ret, stream, finish, { end }); + if (isReadable(stream) && isLastStream) { + lastStreamCleanup.push(cleanup); + } } else if (isIterable(ret)) { finishCount++; pump(ret, stream, finish, { end }); @@ -345,7 +375,7 @@ function pipe(src, dst, finish, { end }) { finish(err); } }); - eos(dst, { readable: false, writable: true }, finish); + return eos(dst, { readable: false, writable: true }, finish); } module.exports = { pipelineImpl, pipeline }; diff --git a/test/parallel/test-stream-pipeline-listeners.js b/test/parallel/test-stream-pipeline-listeners.js new file mode 100644 index 00000000000000..c2a72fc01c8b8d --- /dev/null +++ b/test/parallel/test-stream-pipeline-listeners.js @@ -0,0 +1,76 @@ +'use strict'; + +const common = require('../common'); +const { pipeline, Duplex, PassThrough, Writable } = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'no way'); +}, 2)); + +// Ensure that listeners is removed if last stream is readble +// And other stream's listeners unchanged +const a = new PassThrough(); +a.end('foobar'); +const b = new Duplex({ + write(chunk, encoding, callback) { + callback(); + } +}); +pipeline(a, b, common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(a.listenerCount('error') > 0); + assert.strictEqual(b.listenerCount('error'), 0); + setTimeout(() => { + assert.strictEqual(b.listenerCount('error'), 0); + b.destroy(new Error('no way')); + }, 100); +})); + +// Async generators +const c = new PassThrough(); +c.end('foobar'); +const d = pipeline( + c, + async function* (source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } + }, + common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(c.listenerCount('error') > 0); + assert.strictEqual(d.listenerCount('error'), 0); + setTimeout(() => { + assert.strictEqual(b.listenerCount('error'), 0); + d.destroy(new Error('no way')); + }, 100); + }) +); + +// If last stream is not readable, will not throw and remove listeners +const e = new PassThrough(); +e.end('foobar'); +const f = new Writable({ + write(chunk, encoding, callback) { + callback(); + } +}); +pipeline(e, f, common.mustCall((error) => { + if (error) { + assert.ifError(error); + } + + assert(e.listenerCount('error') > 0); + assert(f.listenerCount('error') > 0); + setTimeout(() => { + assert(f.listenerCount('error') > 0); + f.destroy(new Error('no way')); + }, 100); +}));