Skip to content

Commit

Permalink
stream: allow readable to end early without error
Browse files Browse the repository at this point in the history
PR-URL: #40881
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag authored and danielleadams committed Dec 13, 2021
1 parent a7dfa43 commit 1787bfa
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 57 deletions.
119 changes: 64 additions & 55 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,52 +33,26 @@ const {
isIterable,
isReadableNodeStream,
isNodeStream,
isReadableFinished,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

function destroyer(stream, reading, writing) {
let finished = false;
stream.on('close', () => {
finished = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;

const rState = stream._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
reading &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
stream
.once('end', callback)
.once('error', callback);
} else {
callback(err);
}
});

return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err);
callback(err || new ERR_STREAM_DESTROYED('pipe'));
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
};
}

Expand Down Expand Up @@ -109,7 +83,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish, opts) {
async function pump(iterable, writable, finish, { end }) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) {
}
}

if (opts?.end !== false) {
if (end) {
writable.end();
}

Expand Down Expand Up @@ -220,7 +194,7 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
callback(error, value);
process.nextTick(callback, error, value);
}
}

Expand All @@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) {

if (isNodeStream(stream)) {
if (end) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
} else {
stream.on('error', finish);
destroys.push(destroyer(stream, reading, writing));
}

// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
if (
err &&
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
}
});
}

if (i === 0) {
Expand Down Expand Up @@ -286,15 +261,18 @@ function pipelineImpl(streams, callback, opts) {
// second use.
const then = ret?.then;
if (typeof then === 'function') {
finishCount++;
then.call(ret,
(val) => {
value = val;
pt.write(val);
if (end) {
pt.end();
}
process.nextTick(finish);
}, (err) => {
pt.destroy(err);
process.nextTick(finish, err);
},
);
} else if (isIterable(ret, true)) {
Expand All @@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, finish));
destroys.push(destroyer(ret, false, true));
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
if (stream === process.stdout || stream === process.stderr) {
ret.on('end', () => stream.end());
}
} else {
ret = makeAsyncIterable(ret);

finishCount += 2;
pipe(ret, stream, finish, { end });
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
}
ret = stream;
} else {
Expand All @@ -339,4 +311,41 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, finish, { end }) {
src.pipe(dst, { end });

if (end) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => dst.end());
} else {
finish();
}

eos(src, { readable: true, writable: false }, (err) => {
const rState = src._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
// Some readable streams will emit 'close' before 'end'. However, since
// this is on the readable side 'end' should still be emitted if the
// stream has been ended and no error emitted. This should be allowed in
// favor of backwards compatibility. Since the stream is piped to a
// destination this should not result in any observable difference.
// We don't need to check if this is a writable premature close since
// eos will only fail with premature close on the reading side for
// duplex streams.
src
.once('end', finish)
.once('error', finish);
} else {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
}

module.exports = { pipelineImpl, pipeline };
28 changes: 26 additions & 2 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ const tsp = require('timers/promises');
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustSucceed(() => {
assert.strictEqual(dst.destroyed, true);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}
Expand Down Expand Up @@ -1462,7 +1462,7 @@ const tsp = require('timers/promises');

await pipelinePromise(read, duplex);

assert.strictEqual(duplex.destroyed, true);
assert.strictEqual(duplex.destroyed, false);
}

run().then(common.mustCall());
Expand All @@ -1488,3 +1488,27 @@ const tsp = require('timers/promises');

run().then(common.mustCall());
}

{
const s = new PassThrough({ objectMode: true });
pipeline(async function*() {
await Promise.resolve();
yield 'hello';
yield 'world';
yield 'world';
}, s, async function(source) {
let ret = '';
let n = 0;
for await (const chunk of source) {
if (n++ > 1) {
break;
}
ret += chunk;
}
return ret;
}, common.mustCall((err, val) => {
assert.strictEqual(err, undefined);
assert.strictEqual(val, 'helloworld');
assert.strictEqual(s.destroyed, true);
}));
}

0 comments on commit 1787bfa

Please sign in to comment.