Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix maxBuffer bug with TextDecoder() #105

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions source/array-buffer.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {getStreamContents} from './contents.js';
import {throwObjectStream, getLengthProp} from './utils.js';
import {noop, throwObjectStream, getLengthProp} from './utils.js';

export async function getStreamAsArrayBuffer(stream, options) {
return getStreamContents(stream, arrayBufferMethods, options);
}

const initArrayBuffer = () => new ArrayBuffer(0);
const initArrayBuffer = () => ({contents: new ArrayBuffer(0)});

const useTextEncoder = chunk => textEncoder.encode(chunk);
const textEncoder = new TextEncoder();
Expand All @@ -15,7 +15,7 @@ const useUint8Array = chunk => new Uint8Array(chunk);
const useUint8ArrayWithOffset = chunk => new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);

// `contents` is an increasingly growing `Uint8Array`.
const addArrayBufferChunk = (convertedChunk, contents, length, previousLength) => {
const addArrayBufferChunk = (convertedChunk, {contents, length: previousLength}, length) => {
const newContents = hasArrayBufferResize() ? resizeArrayBuffer(contents, length) : resizeArrayBufferSlow(contents, length);
new Uint8Array(newContents).set(convertedChunk, previousLength);
return newContents;
Expand Down Expand Up @@ -54,7 +54,7 @@ const getNewContentsLength = length => SCALE_FACTOR ** Math.ceil(Math.log(length

const SCALE_FACTOR = 2;

const finalizeArrayBuffer = (contents, length) => hasArrayBufferResize() ? contents : contents.slice(0, length);
const finalizeArrayBuffer = ({contents, length}) => hasArrayBufferResize() ? contents : contents.slice(0, length);

// `ArrayBuffer.slice()` is slow. When `ArrayBuffer.resize()` is available
// (Node >=20.0.0, Safari >=16.4 and Chrome), we can use it instead.
Expand All @@ -76,5 +76,6 @@ const arrayBufferMethods = {
},
getSize: getLengthProp,
addChunk: addArrayBufferChunk,
getFinalChunk: noop,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFinalChunk() is only used by getString(). It is a noop otherwise.

finalize: finalizeArrayBuffer,
};
9 changes: 5 additions & 4 deletions source/array.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {getStreamContents} from './contents.js';
import {identity} from './utils.js';
import {identity, noop, getContentsProp} from './utils.js';

export async function getStreamAsArray(stream, options) {
return getStreamContents(stream, arrayMethods, options);
}

const initArray = () => ([]);
const initArray = () => ({contents: []});

const increment = () => 1;

const addArrayChunk = (convertedChunk, contents) => {
const addArrayChunk = (convertedChunk, {contents}) => {
contents.push(convertedChunk);
return contents;
};
Expand All @@ -26,5 +26,6 @@ const arrayMethods = {
},
getSize: increment,
addChunk: addArrayChunk,
finalize: identity,
getFinalChunk: noop,
finalize: getContentsProp,
};
43 changes: 27 additions & 16 deletions source/contents.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
export const getStreamContents = async (stream, {init, convertChunk, getSize, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
if (!isAsyncIterable(stream)) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}

let length = 0;
let contents = init();
const textDecoder = new TextDecoder();
const state = init();
state.length = 0;

try {
for await (const chunk of stream) {
const chunkType = getChunkType(chunk);
const convertedChunk = convertChunk[chunkType](chunk, textDecoder);
const chunkSize = getSize(convertedChunk);

if (length + chunkSize > maxBuffer) {
throw new MaxBufferError();
}

const newLength = length + chunkSize;
contents = addChunk(convertedChunk, contents, newLength, length);
length = newLength;
const convertedChunk = convertChunk[chunkType](chunk, state);
appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer});
}

return finalize(contents, length, textDecoder);
appendFinalChunk({state, convertChunk, getSize, addChunk, getFinalChunk, maxBuffer});
Copy link
Collaborator Author

@ehmicky ehmicky Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final chunk () must use the same logic as the previous chunks, including the maxBuffer logic.
To do this required two changes:

  • Adding a getFinalChunk() method that returns either undefined (no final chunk) or the final chunk, with the right type
  • Extracting the appendChunk() logic so that it can be used after getFinalChunk(). To do this, the inside of the main for loop had to be extracted to its own function appendChunk(). To do this, stateful arguments had to be put into a state object. Changing this required refactoring some additional functions.

return finalize(state);
} catch (error) {
error.bufferedData = finalize(contents, length, textDecoder);
error.bufferedData = finalize(state);
throw error;
}
};

const appendFinalChunk = ({state, getSize, addChunk, getFinalChunk, maxBuffer}) => {
const convertedChunk = getFinalChunk(state);
if (convertedChunk !== undefined) {
appendChunk({convertedChunk, state, getSize, addChunk, maxBuffer});
}
};

const appendChunk = ({convertedChunk, state, getSize, addChunk, maxBuffer}) => {
const chunkSize = getSize(convertedChunk);
const newLength = state.length + chunkSize;

if (newLength > maxBuffer) {
throw new MaxBufferError();
}

state.contents = addChunk(convertedChunk, state, newLength);
state.length = newLength;
};

const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';

const getChunkType = chunk => {
Expand Down
16 changes: 10 additions & 6 deletions source/string.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {getStreamContents} from './contents.js';
import {identity, throwObjectStream, getLengthProp} from './utils.js';
import {identity, getContentsProp, throwObjectStream, getLengthProp} from './utils.js';

export async function getStreamAsString(stream, options) {
return getStreamContents(stream, stringMethods, options);
}

const initString = () => '';
const initString = () => ({contents: '', textDecoder: new TextDecoder()});

const useTextDecoder = (chunk, textDecoder) => textDecoder.decode(chunk, {stream: true});
const useTextDecoder = (chunk, {textDecoder}) => textDecoder.decode(chunk, {stream: true});

const addStringChunk = (convertedChunk, contents) => contents + convertedChunk;
const addStringChunk = (convertedChunk, {contents}) => contents + convertedChunk;

const finalizeString = (contents, length, textDecoder) => `${contents}${textDecoder.decode()}`;
const getFinalStringChunk = ({textDecoder}) => {
const finalChunk = textDecoder.decode();
return finalChunk === '' ? undefined : finalChunk;
};

const stringMethods = {
init: initString,
Expand All @@ -25,5 +28,6 @@ const stringMethods = {
},
getSize: getLengthProp,
addChunk: addStringChunk,
finalize: finalizeString,
getFinalChunk: getFinalStringChunk,
finalize: getContentsProp,
};
4 changes: 4 additions & 0 deletions source/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export const identity = value => value;

export const noop = () => undefined;

export const getContentsProp = ({contents}) => contents;

export const throwObjectStream = chunk => {
throw new Error(`Streams in object mode are not supported: ${String(chunk)}`);
};
Expand Down
5 changes: 5 additions & 0 deletions test/string.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ test('get stream with truncated UTF-8 sequences', async t => {
t.is(result, `${multiByteString.slice(0, -1)}${INVALID_UTF8_MARKER}`);
});

test('handles truncated UTF-8 sequences over maxBuffer', async t => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test succeeds with the current PR, and fails without it.

const maxBuffer = multiByteString.length - 1;
await t.throwsAsync(setupString(multiByteBuffer.slice(0, -1), {maxBuffer}), {instanceOf: MaxBufferError});
});

test('get stream with invalid UTF-8 sequences', async t => {
const result = await setupString(multiByteBuffer.slice(1, 2));
t.is(result, INVALID_UTF8_MARKER);
Expand Down