diff --git a/doc/api/stream.md b/doc/api/stream.md index d5cabbc57eaad5..901180f72adda3 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -52,9 +52,10 @@ Additionally, this module includes the utility functions [pipeline][] and ### Object Mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` -(or `Uint8Array`) objects. It is possible, however, for stream implementations -to work with other types of JavaScript values (with the exception of `null`, -which serves a special purpose within streams). Such streams are considered to +(or `TypedArray` or `DataView`) objects. +It is possible, however, for stream implementations to work with other types of +JavaScript values (with the exception of `null`, which serves a special purpose +within streams). Such streams are considered to operate in "object mode". Stream instances are switched into object mode using the `objectMode` option @@ -376,6 +377,10 @@ but instead implement [`writable._destroy()`][writable-_destroy]. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. -* `encoding` {string} The encoding if `chunk` is a string +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a string, a `Buffer`, + any `TypedArray`, or a `DataView`. For object mode streams, `chunk` may be + any JavaScript value other than `null`. +* `encoding` {string} The encoding, if `chunk` is a string * `callback` {Function} Optional callback for when the stream is finished * Returns: {this} @@ -486,6 +491,10 @@ the status of the `highWaterMark`. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a string, a `Buffer`, + any `TypedArray`, or a `DataView`. For object mode streams, `chunk` may be + any JavaScript value other than `null`. * `encoding` {string} The encoding, if `chunk` is a string * `callback` {Function} Callback for when this chunk of data is flushed * Returns: {boolean} `false` if the stream wishes for the calling code to @@ -1130,15 +1139,19 @@ setTimeout(() => { -* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be - any JavaScript value other than `null`. +* `chunk` {Buffer|TypedArray|DataView|string|any} Chunk of data to unshift onto + the read queue. For streams not operating in object mode, `chunk` must be a + string, a `Buffer`, any `TypedArray`, or a `DataView`. For object mode + streams, `chunk` may be any JavaScript value other than `null`. The `readable.unshift()` method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by @@ -1514,8 +1527,8 @@ changes: * `objectMode` {boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, it becomes possible to write JavaScript values other than string, - `Buffer` or `Uint8Array` if supported by the stream implementation. - **Default:** `false`. + `Buffer`, `TypedArray`, and `DataView` if supported by the stream + implementation. **Default:** `false`. * `emitClose` {boolean} Whether or not the stream should emit `'close'` after it has been destroyed. **Default:** `true`. * `write` {Function} Implementation for the @@ -1881,24 +1894,28 @@ It can be overridden by child classes but it **must not** be called directly. #### readable.push(chunk[, encoding]) -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be - any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push + into the read queue. For streams not operating in object mode, `chunk` must + be a string, a `Buffer`, any `TypedArray`, or a `DataView`. For object mode + streams, `chunk` may be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. * Returns: {boolean} `true` if additional chunks of data may continue to be pushed; `false` otherwise. -When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will -be added to the internal queue for users of the stream to consume. -Passing `chunk` as `null` signals the end of the stream (EOF), after which no -more data can be written. +When `chunk` is a `Buffer`, any `TypedArray`, a `DataView` or a `string`, the +`chunk` of data will be added to the internal queue for users of the stream to +consume. Passing `chunk` as `null` signals the end of the stream (EOF), after +which no more data can be written. When the `Readable` is operating in paused mode, the data added with `readable.push()` can be read out by calling the @@ -2423,11 +2440,11 @@ situations within Node.js where this is done, particularly in the Use of `readable.push('')` is not recommended. -Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in -object mode has an interesting side effect. Because it *is* a call to -[`readable.push()`][stream-push], the call will end the reading process. -However, because the argument is an empty string, no data is added to the -readable buffer so there is nothing for a user to consume. +Pushing a zero-byte string, `Buffer`, `TypedArray`, or a `DataView` to a +stream that is not in object mode has an interesting side effect. Because it +*is* a call to [`readable.push()`][stream-push], the call will end the reading +process. However, because the argument is an empty string, no data is added to +the readable buffer so there is nothing for a user to consume. ### `highWaterMark` discrepancy after calling `readable.setEncoding()` diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2a2122e0e553cd..590b500bb975a4 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -241,11 +241,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (er) { errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { - if (typeof chunk !== 'string' && - !state.objectMode && - Object.getPrototypeOf(chunk) !== Buffer.prototype) { - chunk = Stream._uint8ArrayToBuffer(chunk); - } + // if (typeof chunk !== 'string' && + // !state.objectMode && + // Object.getPrototypeOf(chunk) !== Buffer.prototype) { + // chunk = Stream._typedArrayToBuffer(chunk); + // } if (addToFront) { if (state.endEmitted) @@ -301,12 +301,17 @@ function addChunk(stream, state, chunk, addToFront) { function chunkInvalid(state, chunk) { var er; - if (!Stream._isUint8Array(chunk) && + if (!Stream._isArrayBufferView(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { er = new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + 'chunk', + ['string', + 'Buffer', + 'TypedArray', + 'DataView'], + chunk); } return er; } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 022dcffdd78e28..6138f9bc4b8aba 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -259,8 +259,15 @@ function validChunk(stream, state, chunk, cb) { if (chunk === null) { er = new ERR_STREAM_NULL_VALUES(); - } else if (typeof chunk !== 'string' && !state.objectMode) { - er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); + } else if (typeof chunk !== 'string' && + !state.objectMode && + !Stream._isArrayBufferView(chunk)) { + er = new ERR_INVALID_ARG_TYPE('chunk', + ['string', + 'Buffer', + 'TypedArray', + 'DataView'], + chunk); } if (er) { errorOrDestroy(stream, er); @@ -273,11 +280,11 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; - var isBuf = !state.objectMode && Stream._isUint8Array(chunk); + var isBuf = !state.objectMode && Stream._isArrayBufferView(chunk); - if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) { - chunk = Stream._uint8ArrayToBuffer(chunk); - } + // if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) { + // chunk = Stream._typedArrayToBuffer(chunk); + // } if (typeof encoding === 'function') { cb = encoding; diff --git a/lib/stream.js b/lib/stream.js index dfe61aaf09d95d..ace69a1144fcc7 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -24,6 +24,7 @@ const { Buffer } = require('buffer'); const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); +const { isArrayBufferView } = require('internal/util/types'); // Note: export Stream before Readable/Writable/Duplex/... // to avoid a cross-reference(require) issues @@ -42,44 +43,7 @@ Stream.finished = eos; Stream.Stream = Stream; // Internal utilities -try { - const types = require('util').types; - if (types && typeof types.isUint8Array === 'function') { - Stream._isUint8Array = types.isUint8Array; - } else { - // This throws for Node < 4.2.0 because there's no util binding and - // returns undefined for Node < 7.4.0. - // Please do not convert process.binding() to internalBinding() here. - // This is for compatibility with older versions when loaded as - // readable-stream. - Stream._isUint8Array = process.binding('util').isUint8Array; - } -} catch (e) { // eslint-disable-line no-unused-vars -} - -if (!Stream._isUint8Array) { - Stream._isUint8Array = function _isUint8Array(obj) { - return Object.prototype.toString.call(obj) === '[object Uint8Array]'; - }; -} - -const version = process.version.substr(1).split('.'); -if (version[0] === 0 && version[1] < 12) { - Stream._uint8ArrayToBuffer = Buffer; -} else { - try { - const internalBuffer = require('internal/buffer'); - Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { - return new internalBuffer.FastBuffer(chunk.buffer, - chunk.byteOffset, - chunk.byteLength); - }; - } catch (e) { // eslint-disable-line no-unused-vars - } - - if (!Stream._uint8ArrayToBuffer) { - Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { - return Buffer.prototype.slice.call(chunk); - }; - } -} +Stream._isArrayBufferView = isArrayBufferView; +Stream._typedArrayToBuffer = function _typedArrayToBuffer(chunk) { + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); +}; diff --git a/test/parallel/test-stream-pipe-typedarrays-flow.js b/test/parallel/test-stream-pipe-typedarrays-flow.js new file mode 100644 index 00000000000000..28c87b610ce31d --- /dev/null +++ b/test/parallel/test-stream-pipe-typedarrays-flow.js @@ -0,0 +1,74 @@ +'use strict'; +const common = require('../common'); +const { Readable, Writable, PassThrough } = require('stream'); + +// getArrayBufferViews requires buffer length to be multiple of 8 +const msgBuffer = Buffer.from('hello'.repeat(8)); +const testBuffers = common.getArrayBufferViews(msgBuffer); + +{ + let ticks = testBuffers.length; + + const rs = new Readable({ + objectMode: false, + read: () => { + if (ticks > 0) { + process.nextTick(() => rs.push(testBuffers[ticks - 1])); + ticks--; + return; + } + rs.push(Buffer.from('')); + rs.push(null); + } + }); + + const ws = new Writable({ + highWaterMark: 0, + objectMode: false, + write: (data, end, cb) => setImmediate(cb) + }); + + rs.on('end', common.mustCall()); + ws.on('finish', common.mustCall()); + rs.pipe(ws); +} + +{ + let missing = 8; + + const rs = new Readable({ + objectMode: false, + read: () => { + if (missing--) rs.push(testBuffers[missing]); + else rs.push(null); + } + }); + + const pt = rs + .pipe(new PassThrough({ objectMode: false, highWaterMark: 2 })) + .pipe(new PassThrough({ objectMode: false, highWaterMark: 2 })); + + pt.on('end', function() { + wrapper.push(null); + }); + + const wrapper = new Readable({ + objectMode: false, + read: () => { + process.nextTick(function() { + let data = pt.read(); + if (data === null) { + pt.once('readable', function() { + data = pt.read(); + if (data !== null) wrapper.push(data); + }); + } else { + wrapper.push(data); + } + }); + } + }); + + wrapper.resume(); + wrapper.on('end', common.mustCall()); +} diff --git a/test/parallel/test-stream-transform-objectmode-typedarrays.js b/test/parallel/test-stream-transform-objectmode-typedarrays.js new file mode 100644 index 00000000000000..d0df7ea1cbe3ac --- /dev/null +++ b/test/parallel/test-stream-transform-objectmode-typedarrays.js @@ -0,0 +1,54 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const stream = require('stream'); +const PassThrough = stream.PassThrough; + +const src = new PassThrough({ objectMode: true }); +const tx = new PassThrough({ objectMode: true }); +const dest = new PassThrough({ objectMode: true }); + +// getArrayBufferViews expects buffer to have length a multiple of 8 +const expect = common.getArrayBufferViews( + Buffer.from('hello'.repeat(8)) +); +const results = []; + +dest.on('data', common.mustCall(function(x) { + results.push(x); +}, expect.length)); + +src.pipe(tx).pipe(dest); + +let i = 0; +const int = setInterval(common.mustCall(function() { + if (results.length === expect.length) { + src.end(); + clearInterval(int); + assert.deepStrictEqual(results, expect); + } else { + src.write(expect[i++]); + } +}, expect.length + 1), 1); diff --git a/test/parallel/test-stream-typedarrays.js b/test/parallel/test-stream-typedarrays.js new file mode 100644 index 00000000000000..ebf7b13b9ed0f4 --- /dev/null +++ b/test/parallel/test-stream-typedarrays.js @@ -0,0 +1,113 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable, Writable } = require('stream'); + +const messageBuffer = Buffer.from('ABCDEFGH'); // Needs to be a multiple of 8 +const toBeWritten = [...common.getArrayBufferViews(messageBuffer)]; +const nthWrittenBuffer = (n) => Buffer.from( + toBeWritten[n].buffer, + toBeWritten[n].byteOffset, + toBeWritten[n].byteLength +); + +{ + // Simple Writable test. + + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(chunk, nthWrittenBuffer(n++)); + cb(); + }, toBeWritten.length) + }); + + toBeWritten.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test, pass in TypedArray in object mode. + + let n = 0; + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.strictEqual(chunk, toBeWritten[n]); + assert.deepStrictEqual(chunk, toBeWritten[n]); + assert.strictEqual(encoding, 'utf8'); + n++; + cb(); + }, toBeWritten.length) + }); + + toBeWritten.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test, multiple writes carried out via writev. + let callback; + + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert.strictEqual(encoding, 'buffer'); + assert.deepStrictEqual(chunk, nthWrittenBuffer(0)); + callback = cb; + }), + writev: common.mustCall((chunks, cb) => { + const expectedWritevLength = toBeWritten.length - 1; + assert.strictEqual(chunks.length, expectedWritevLength); + for (let n = 0; n < expectedWritevLength; n++) { + assert.deepStrictEqual(chunks[n].chunk, nthWrittenBuffer(n + 1)); + } + }) + }); + + toBeWritten.forEach((msg, index) => { + if (index !== toBeWritten.length - 1) { + writable.write(msg); + } else { + writable.end(msg); + } + }); + callback(); +} + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + toBeWritten.forEach((wbuf) => readable.push(wbuf)); + readable.unshift(toBeWritten[0]); + + const buf = readable.read(); + assert(buf instanceof Buffer); + const expectedWrittenBufferEntries = toBeWritten.map( + (wbuf, index) => nthWrittenBuffer(index) + ).reduce((acc, wbuf) => acc.concat(...wbuf), []); + assert.deepStrictEqual([...buf], expectedWrittenBufferEntries); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + toBeWritten.forEach((wbuf) => readable.push(wbuf)); + readable.unshift(toBeWritten[0]); + + const out = readable.read(); + assert.strictEqual(out, 'ABCDEFGH'.repeat(10)); +} diff --git a/test/parallel/test-stream-uint8array.js b/test/parallel/test-stream-uint8array.js index 38a45d54048967..a7db5157bdfbb1 100644 --- a/test/parallel/test-stream-uint8array.js +++ b/test/parallel/test-stream-uint8array.js @@ -14,11 +14,12 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]); let n = 0; const writable = new Writable({ write: common.mustCall((chunk, encoding, cb) => { - assert(chunk instanceof Buffer); + assert(!(chunk instanceof Buffer)); + assert(chunk instanceof Uint8Array); if (n++ === 0) { - assert.strictEqual(String(chunk), 'ABC'); + assert.deepStrictEqual(chunk, ABC); } else { - assert.strictEqual(String(chunk), 'DEF'); + assert.deepStrictEqual(chunk, DEF); } cb(); @@ -52,16 +53,19 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]); const writable = new Writable({ write: common.mustCall((chunk, encoding, cb) => { - assert(chunk instanceof Buffer); + assert(chunk instanceof Uint8Array); assert.strictEqual(encoding, 'buffer'); - assert.strictEqual(String(chunk), 'ABC'); + assert.deepStrictEqual(chunk, ABC); callback = cb; }), writev: common.mustCall((chunks, cb) => { assert.strictEqual(chunks.length, 2); assert.strictEqual(chunks[0].encoding, 'buffer'); assert.strictEqual(chunks[1].encoding, 'buffer'); - assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI'); + assert.deepStrictEqual( + [].concat(chunks[0].chunk, chunks[1].chunk), + [].concat(DEF, GHI) + ); }) }); @@ -81,7 +85,7 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]); readable.unshift(ABC); const buf = readable.read(); - assert(buf instanceof Buffer); + assert(buf instanceof Uint8Array); assert.deepStrictEqual([...buf], [...ABC, ...DEF]); } @@ -97,5 +101,5 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]); readable.unshift(ABC); const out = readable.read(); - assert.strictEqual(out, 'ABCDEF'); + assert.strictEqual(out, Buffer.from(ABC)); }