Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: destroy fixes #29058

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the
[`autoDestroy`][writable-new] option was set to `true` when creating the
stream.

After `'error'`, no further events other than `'close'` *should* be emitted
(including `'error'` events).

##### Event: 'finish'
<!-- YAML
added: v0.9.4
Expand Down
3 changes: 3 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true;

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

Expand Down
2 changes: 0 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,11 @@ function onwriteError(stream, state, sync, er, cb) {
// This can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
// This can emit finish, but finish must
// always follow error
Expand Down
107 changes: 60 additions & 47 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
'use strict';

function needError(stream, err) {
if (!err) {
return false;
}

const r = stream._readableState;
const w = stream._writableState;

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return false;
}

if (w) {
w.errorEmitted = true;
}
if (r) {
r.errorEmitted = true;
}

return true;
}

// Undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
const readableDestroyed = this._readableState &&
this._readableState.destroyed;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;
const r = this._readableState;
const w = this._writableState;

if (readableDestroyed || writableDestroyed) {
if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
if (!this._writableState) {
process.nextTick(emitErrorNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorNT, this, err);
}
} else if (needError(this, err)) {
process.nextTick(emitErrorNT, this, err);
}

return this;
Expand All @@ -25,28 +40,19 @@ function destroy(err, cb) {
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

if (this._readableState) {
this._readableState.destroyed = true;
if (w) {
w.destroyed = true;
}

// If this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
if (r) {
r.destroyed = true;
}

this._destroy(err || null, (err) => {
if (!cb && err) {
if (!this._writableState) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
} else if (cb) {
if (cb) {
process.nextTick(emitCloseNT, this);
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
Expand All @@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) {
}

function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose)
const r = self._readableState;
const w = self._writableState;

if (w && !w.emitClose)
return;
if (self._readableState && !self._readableState.emitClose)
if (r && !r.emitClose)
return;
self.emit('close');
}

function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
const r = this._readableState;
const w = this._writableState;

if (r) {
r.destroyed = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
r.errorEmitted = false;
}

if (this._writableState) {
this._writableState.destroyed = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finalCalled = false;
this._writableState.prefinished = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
if (w) {
w.destroyed = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
w.prefinished = false;
w.finished = false;
w.errorEmitted = false;
}
}

Expand All @@ -98,12 +111,12 @@ function errorOrDestroy(stream, err) {
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.

const rState = stream._readableState;
const wState = stream._writableState;
const r = stream._readableState;
const w = stream._writableState;

if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else
else if (needError(stream, err))
stream.emit('error', err);
}

Expand Down
6 changes: 4 additions & 2 deletions test/parallel/test-net-connect-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() {
[],
{}
].forEach((value) => {
common.expectsError(() => socket.write(value), {
// We need to check the callback since 'error' will only
// be emitted once per instance.
socket.write(value, common.expectsError({
ronag marked this conversation as resolved.
Show resolved Hide resolved
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError,
message: 'The "chunk" argument must be one of type string or Buffer. ' +
`Received type ${typeof value}`
});
}));
});

// Write a string that contains a multi-byte character sequence to test that
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-error-once.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const { Writable, Readable } = require('stream');

{
const writable = new Writable();
writable.on('error', common.mustCall());
writable.end();
writable.write('h');
writable.write('h');
}

{
const readable = new Readable();
readable.on('error', common.mustCall());
readable.push(null);
readable.push('h');
readable.push('h');
}
33 changes: 24 additions & 9 deletions test/parallel/test-stream-readable-invalid-chunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,32 @@
const common = require('../common');
const stream = require('stream');

const readable = new stream.Readable({
read: () => {}
});

function checkError(fn) {
common.expectsError(fn, {
function testPushArg(val) {
const readable = new stream.Readable({
read: () => {}
});
readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
}));
readable.push(val);
}

testPushArg([]);
testPushArg({});
testPushArg(0);

function testUnshiftArg(val) {
const readable = new stream.Readable({
read: () => {}
});
readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
}));
readable.unshift(val);
}

checkError(() => readable.push([]));
checkError(() => readable.push({}));
checkError(() => readable.push(0));
testUnshiftArg([]);
testUnshiftArg({});
testUnshiftArg(0);
17 changes: 0 additions & 17 deletions test/parallel/test-stream-readable-unshift.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,6 @@ const { Readable } = require('stream');

}

{
// Check that error is thrown for invalid chunks

const readable = new Readable({ read() {} });
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

checkError(() => readable.unshift([]));
checkError(() => readable.unshift({}));
checkError(() => readable.unshift(0));

}

{
// Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} });
Expand Down
8 changes: 1 addition & 7 deletions test/parallel/test-stream-unshift-read-race.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) {
};

r.on('end', common.mustCall(function() {
common.expectsError(function() {
r.unshift(Buffer.allocUnsafe(1));
}, {
code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
type: Error,
message: 'stream.unshift() after end event'
});
r.unshift(Buffer.allocUnsafe(1));
w.end();
}));

Expand Down
39 changes: 39 additions & 0 deletions test/parallel/test-stream2-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,42 @@ const helloWorldBuffer = Buffer.from('hello world');
w.write(Buffer.allocUnsafe(1));
w.end(Buffer.allocUnsafe(0));
}

{
// Verify that error is only emitted once when failing in _finish.
const w = new W();

w._final = common.mustCall(function(cb) {
cb(new Error('test'));
});
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.message, 'test');
w.on('error', common.mustNotCall());
w.destroy(new Error());
}));
w.end();
}

{
// Verify that error is only emitted once when failing in write.
const w = new W();
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.code, 'ERR_STREAM_NULL_VALUES');
}));
w.write(null);
w.destroy(new Error());
}

{
// Verify that error is only emitted once when failing in write after end.
const w = new W();
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
w.end();
w.write('hello');
w.destroy(new Error());
}