Skip to content

Commit

Permalink
Allow multiple readers at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Mar 10, 2024
1 parent 7ccab70 commit db5d18b
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 14 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
"object",
"concat"
],
"dependencies": {
"is-stream": "^4.0.1"
},
"devDependencies": {
"@types/node": "^20.8.9",
"ava": "^5.3.1",
Expand Down
10 changes: 4 additions & 6 deletions source/contents.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {getAsyncIterable} from './stream.js';

export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
if (!isAsyncIterable(stream)) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}
const asyncIterable = getAsyncIterable(stream);

const state = init();
state.length = 0;

try {
for await (const chunk of stream) {
for await (const chunk of asyncIterable) {
const chunkType = getChunkType(chunk);
const convertedChunk = convertChunk[chunkType](chunk, state);
appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer});
Expand Down Expand Up @@ -51,8 +51,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => {
state.length = newLength;
};

const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';

const getChunkType = chunk => {
const typeOfChunk = typeof chunk;

Expand Down
46 changes: 46 additions & 0 deletions source/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {isReadableStream} from 'is-stream';

export const getAsyncIterable = stream => {
if (isReadableStream(stream, {checkOpen: false})) {
return getStreamIterable(stream);
}

if (stream?.[Symbol.asyncIterator] === undefined) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}

return stream;
};

// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
const getStreamIterable = async function * (stream) {
const {on, finished} = await getNodeImports();
const onStreamData = on(stream, 'data', {highWatermark: stream.readableHighWaterMark});
handleStreamEnd(stream, onStreamData, finished);
try {
for await (const [chunk] of onStreamData) {
yield chunk;
}
} finally {
stream.destroy();
}
};

const handleStreamEnd = async (stream, onStreamData, finished) => {
try {
await finished(stream, {cleanup: true, readable: true, writable: false});
await onStreamData.return();
} catch (error) {
const normalizedError = error instanceof Error ? error : new Error(String(error));
await onStreamData.throw(normalizedError);
}
};

// Use dynamic imports to support browsers
const getNodeImports = async () => {
const [{on}, {finished}] = await Promise.all([
import('node:events'),
import('node:stream/promises'),
]);
return {on, finished};
};
12 changes: 4 additions & 8 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import {Duplex} from 'node:stream';
import {Duplex, Readable} from 'node:stream';

export const createStream = streamDef => {
const generator = typeof streamDef === 'function' ? streamDef : function * () {
yield * streamDef;
};

return Duplex.from(generator);
};
export const createStream = streamDef => typeof streamDef === 'function'
? Duplex.from(streamDef)
: Readable.from(streamDef);

// Tests related to big buffers/strings can be slow. We run them serially and
// with a higher timeout to ensure they do not randomly fail.
Expand Down
Loading

0 comments on commit db5d18b

Please sign in to comment.