Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: allow typed arrays to be written and read #22427

Closed
wants to merge 11 commits into from
79 changes: 48 additions & 31 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ Additionally, this module includes the utility functions [pipeline][] and
### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
(or `Uint8Array`) objects. It is possible, however, for stream implementations
to work with other types of JavaScript values (with the exception of `null`,
which serves a special purpose within streams). Such streams are considered to
(or `TypedArray` or `DataView`) objects.
It is possible, however, for stream implementations to work with other types of
JavaScript values (with the exception of `null`, which serves a special purpose
within streams). Such streams are considered to
operate in "object mode".

Stream instances are switched into object mode using the `objectMode` option
Expand Down Expand Up @@ -376,6 +377,10 @@ but instead implement [`writable._destroy()`][writable-_destroy].
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22427
description: The `chunk` argument can now be any `TypedArray` or a
`DataView`.
- version: v10.0.0
pr-url: https://github.com/nodejs/node/pull/18780
description: This method now returns a reference to `writable`.
Expand All @@ -384,11 +389,11 @@ changes:
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding if `chunk` is a string
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
streams not operating in object mode, `chunk` must be a string, a `Buffer`,
any `TypedArray`, or a `DataView`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Optional callback for when the stream is finished
* Returns: {this}

Expand Down Expand Up @@ -486,6 +491,10 @@ the status of the `highWaterMark`.
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22427
description: The `chunk` argument can now be any `TypedArray` or a
`DataView`.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
Expand All @@ -495,10 +504,10 @@ changes:
considered invalid now, even in object mode.
-->

* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For
streams not operating in object mode, `chunk` must be a string, a `Buffer`,
any `TypedArray`, or a `DataView`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Callback for when this chunk of data is flushed
* Returns: {boolean} `false` if the stream wishes for the calling code to
Expand Down Expand Up @@ -1130,15 +1139,19 @@ setTimeout(() => {
<!-- YAML
added: v0.9.11
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22427
description: The `chunk` argument can now be any `TypedArray` or a
`DataView`.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
* `chunk` {Buffer|TypedArray|DataView|string|any} Chunk of data to unshift onto
the read queue. For streams not operating in object mode, `chunk` must be a
string, a `Buffer`, any `TypedArray`, or a `DataView`. For object mode
streams, `chunk` may be any JavaScript value other than `null`.

The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
Expand Down Expand Up @@ -1514,8 +1527,8 @@ changes:
* `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
**Default:** `false`.
`Buffer`, `TypedArray`, and `DataView` if supported by the stream
implementation. **Default:** `false`.
* `emitClose` {boolean} Whether or not the stream should emit `'close'`
after it has been destroyed. **Default:** `true`.
* `write` {Function} Implementation for the
Expand Down Expand Up @@ -1881,24 +1894,28 @@ It can be overridden by child classes but it **must not** be called directly.
#### readable.push(chunk[, encoding])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22427
description: The `chunk` argument can now be any `TypedArray` or a
`DataView`.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->

* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value.
* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push
into the read queue. For streams not operating in object mode, `chunk` must
be a string, a `Buffer`, any `TypedArray`, or a `DataView`. For object mode
streams, `chunk` may be any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
`Buffer` encoding, such as `'utf8'` or `'ascii'`.
* Returns: {boolean} `true` if additional chunks of data may continue to be
pushed; `false` otherwise.

When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will
be added to the internal queue for users of the stream to consume.
Passing `chunk` as `null` signals the end of the stream (EOF), after which no
more data can be written.
When `chunk` is a `Buffer`, any `TypedArray`, a `DataView` or a `string`, the
`chunk` of data will be added to the internal queue for users of the stream to
consume. Passing `chunk` as `null` signals the end of the stream (EOF), after
which no more data can be written.

When the `Readable` is operating in paused mode, the data added with
`readable.push()` can be read out by calling the
Expand Down Expand Up @@ -2423,11 +2440,11 @@ situations within Node.js where this is done, particularly in the

Use of `readable.push('')` is not recommended.

Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in
object mode has an interesting side effect. Because it *is* a call to
[`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume.
Pushing a zero-byte string, `Buffer`, `TypedArray`, or a `DataView` to a
stream that is not in object mode has an interesting side effect. Because it
*is* a call to [`readable.push()`][stream-push], the call will end the reading
process. However, because the argument is an empty string, no data is added to
the readable buffer so there is nothing for a user to consume.

### `highWaterMark` discrepancy after calling `readable.setEncoding()`

Expand Down
19 changes: 12 additions & 7 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (er) {
errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
!state.objectMode &&
Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
// if (typeof chunk !== 'string' &&
// !state.objectMode &&
// Object.getPrototypeOf(chunk) !== Buffer.prototype) {
// chunk = Stream._typedArrayToBuffer(chunk);
// }

if (addToFront) {
if (state.endEmitted)
Expand Down Expand Up @@ -301,12 +301,17 @@ function addChunk(stream, state, chunk, addToFront) {

function chunkInvalid(state, chunk) {
var er;
if (!Stream._isUint8Array(chunk) &&
if (!Stream._isArrayBufferView(chunk) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
er = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
'chunk',
['string',
'Buffer',
'TypedArray',
'DataView'],
chunk);
}
return er;
}
Expand Down
19 changes: 13 additions & 6 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,15 @@ function validChunk(stream, state, chunk, cb) {

if (chunk === null) {
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else if (typeof chunk !== 'string' &&
!state.objectMode &&
!Stream._isArrayBufferView(chunk)) {
er = new ERR_INVALID_ARG_TYPE('chunk',
['string',
'Buffer',
'TypedArray',
'DataView'],
chunk);
}
if (er) {
errorOrDestroy(stream, er);
Expand All @@ -273,11 +280,11 @@ function validChunk(stream, state, chunk, cb) {
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
var isBuf = !state.objectMode && Stream._isUint8Array(chunk);
var isBuf = !state.objectMode && Stream._isArrayBufferView(chunk);

if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
// if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
// chunk = Stream._typedArrayToBuffer(chunk);
// }

if (typeof encoding === 'function') {
cb = encoding;
Expand Down
46 changes: 5 additions & 41 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
const { Buffer } = require('buffer');
const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const { isArrayBufferView } = require('internal/util/types');

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
Expand All @@ -42,44 +43,7 @@ Stream.finished = eos;
Stream.Stream = Stream;

// Internal utilities
try {
const types = require('util').types;
if (types && typeof types.isUint8Array === 'function') {
Stream._isUint8Array = types.isUint8Array;
} else {
// This throws for Node < 4.2.0 because there's no util binding and
// returns undefined for Node < 7.4.0.
// Please do not convert process.binding() to internalBinding() here.
// This is for compatibility with older versions when loaded as
// readable-stream.
Stream._isUint8Array = process.binding('util').isUint8Array;
}
} catch (e) { // eslint-disable-line no-unused-vars
}

if (!Stream._isUint8Array) {
Stream._isUint8Array = function _isUint8Array(obj) {
return Object.prototype.toString.call(obj) === '[object Uint8Array]';
};
}

const version = process.version.substr(1).split('.');
if (version[0] === 0 && version[1] < 12) {
Stream._uint8ArrayToBuffer = Buffer;
} else {
try {
const internalBuffer = require('internal/buffer');
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return new internalBuffer.FastBuffer(chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
};
} catch (e) { // eslint-disable-line no-unused-vars
}

if (!Stream._uint8ArrayToBuffer) {
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return Buffer.prototype.slice.call(chunk);
};
}
}
Stream._isArrayBufferView = isArrayBufferView;
Stream._typedArrayToBuffer = function _typedArrayToBuffer(chunk) {
return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);
};
74 changes: 74 additions & 0 deletions test/parallel/test-stream-pipe-typedarrays-flow.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use strict';
const common = require('../common');
const { Readable, Writable, PassThrough } = require('stream');

// getArrayBufferViews requires buffer length to be multiple of 8
const msgBuffer = Buffer.from('hello'.repeat(8));
const testBuffers = common.getArrayBufferViews(msgBuffer);

{
let ticks = testBuffers.length;

const rs = new Readable({
objectMode: false,
read: () => {
if (ticks > 0) {
process.nextTick(() => rs.push(testBuffers[ticks - 1]));
ticks--;
return;
}
rs.push(Buffer.from(''));
rs.push(null);
}
});

const ws = new Writable({
highWaterMark: 0,
objectMode: false,
write: (data, end, cb) => setImmediate(cb)
});

rs.on('end', common.mustCall());
ws.on('finish', common.mustCall());
rs.pipe(ws);
}

{
let missing = 8;

const rs = new Readable({
objectMode: false,
read: () => {
if (missing--) rs.push(testBuffers[missing]);
else rs.push(null);
}
});

const pt = rs
.pipe(new PassThrough({ objectMode: false, highWaterMark: 2 }))
.pipe(new PassThrough({ objectMode: false, highWaterMark: 2 }));

pt.on('end', function() {
wrapper.push(null);
});

const wrapper = new Readable({
objectMode: false,
read: () => {
process.nextTick(function() {
let data = pt.read();
if (data === null) {
pt.once('readable', function() {
data = pt.read();
if (data !== null) wrapper.push(data);
});
} else {
wrapper.push(data);
}
});
}
});

wrapper.resume();
wrapper.on('end', common.mustCall());
}
Loading