Skip to content

Commit

Permalink
stream: implement ReadableStream.from
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Jun 23, 2023
1 parent da80964 commit 41bf728
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
76 changes: 76 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
ArrayBufferPrototypeSlice,
ArrayPrototypePush,
ArrayPrototypeShift,
Boolean,
DataView,
FunctionPrototypeBind,
FunctionPrototypeCall,
Expand Down Expand Up @@ -110,6 +111,8 @@ const {
nonOpCancel,
nonOpPull,
nonOpStart,
getIterator,
iteratorNext,
kType,
kState,
} = require('internal/webstreams/util');
Expand Down Expand Up @@ -314,6 +317,10 @@ class ReadableStream {
return isReadableStreamLocked(this);
}

static from(iterable) {
return readableStreamFromIterable(iterable);
}

/**
* @param {any} [reason]
* @returns { Promise<void> }
Expand Down Expand Up @@ -1249,6 +1256,75 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');

const startAlgorithm = nonOpStart;

function pullAlgorithm() {
let nextResult;
try {
nextResult = iteratorNext(iteratorRecord);
} catch (error) {
return PromiseReject(error);
}
const nextPromise = PromiseResolve(nextResult);
return PromisePrototypeThen(nextPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.next() method must fulfill with an object');
}
const done = Boolean(iterResult.done);
if (done) {
readableStreamDefaultControllerClose(stream[kState].controller);
} else {
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
}
});
}

function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
let returnMethod;
try {
returnMethod = iterator.return;
} catch (error) {
return PromiseReject(error);
}
if (returnMethod === undefined) {
return PromiseResolve();
}
let returnResult;
try {
returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
} catch (error) {
return PromiseReject(error);
}
const returnPromise = PromiseResolve(returnResult);
return PromisePrototypeThen(returnPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.return() method must fulfill with an object');
}
return undefined;
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});

return stream;
}

function readableStreamPipeTo(
source,
dest,
Expand Down
53 changes: 53 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ const {
PromiseReject,
ReflectGet,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
Uint8Array,
} = primordials;

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_OPERATION_FAILED,
ERR_INVALID_STATE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -217,6 +220,54 @@ function lazyTransfer() {
return transfer;
}

function createAsyncFromSyncIterator(syncIteratorRecord) {
const syncIterable = {
[SymbolIterator]: () => syncIteratorRecord.iterator,
};

const asyncIterator = (async function* () {
return yield* syncIterable;
}());

const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
}

function getIterator(obj, kind = 'sync', method) {
if (method === undefined) {
if (kind === 'async') {
method = obj[SymbolAsyncIterator];
if (method === undefined) {
const syncMethod = obj[SymbolIterator];
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
return createAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = obj[SymbolIterator];
}
}

const iterator = FunctionPrototypeCall(method, obj);
if (typeof iterator !== 'object' || iterator === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
}

function iteratorNext(iteratorRecord, value) {
let result;
if (value === undefined) {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
if (typeof result !== 'object' || result === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
}
return result;
}

module.exports = {
ArrayBufferViewGetBuffer,
ArrayBufferViewGetByteLength,
Expand All @@ -243,6 +294,8 @@ module.exports = {
nonOpPull,
nonOpStart,
nonOpWrite,
getIterator,
iteratorNext,
kType,
kState,
};

0 comments on commit 41bf728

Please sign in to comment.