Skip to content

Commit

Permalink
stream: handle enqueuing chunks when a pending BYOB pull request exists
Browse files Browse the repository at this point in the history
Signed-off-by: Daeyeon Jeong <daeyeon.dev@gmail.com>
PR-URL: #44770
Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
daeyeon authored and danielleadams committed Oct 5, 2022
1 parent 2a5bce6 commit f300f19
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
72 changes: 58 additions & 14 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2647,13 +2647,22 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
);
}

firstPendingPullInto.buffer =
transferArrayBuffer(firstPendingPullInto.buffer);
}
readableByteStreamControllerInvalidateBYOBRequest(controller);

readableByteStreamControllerInvalidateBYOBRequest(controller);
firstPendingPullInto.buffer = transferArrayBuffer(
firstPendingPullInto.buffer
);

if (firstPendingPullInto.type === 'none') {
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
firstPendingPullInto
);
}
}

if (readableStreamHasDefaultReader(stream)) {
readableByteStreamControllerProcessReadRequestsUsingQueue(controller);
if (!readableStreamGetNumReadRequests(stream)) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
Expand All @@ -2662,6 +2671,10 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
byteLength);
} else {
assert(!queue.length);
if (pendingPullIntos.length) {
assert(pendingPullIntos[0].type === 'default');
readableByteStreamControllerShiftPendingPullInto(controller);
}
const transferredView =
new Uint8Array(transferredBuffer, byteOffset, byteLength);
readableStreamFulfillReadRequest(stream, transferredView, false);
Expand Down Expand Up @@ -2984,25 +2997,56 @@ function readableByteStreamControllerCancelSteps(controller, reason) {
return result;
}

function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) {
const {
queue,
queueTotalSize,
} = controller[kState];
assert(queueTotalSize > 0);
const {
buffer,
byteOffset,
byteLength,
} = ArrayPrototypeShift(queue);

controller[kState].queueTotalSize -= byteLength;
readableByteStreamControllerHandleQueueDrain(controller);
const view = new Uint8Array(buffer, byteOffset, byteLength);
readRequest[kChunk](view);
}

function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) {
const {
stream,
queueTotalSize,
} = controller[kState];
const { reader } = stream[kState];
assert(isReadableStreamDefaultReader(reader));

while (reader[kState].readRequests.length > 0) {
if (queueTotalSize === 0) {
return;
}
readableByteStreamControllerFillReadRequestFromQueue(
controller,
ArrayPrototypeShift(reader[kState].readRequests),
);
}
}

function readableByteStreamControllerPullSteps(controller, readRequest) {
const {
pendingPullIntos,
queue,
queueTotalSize,
stream,
} = controller[kState];
assert(readableStreamHasDefaultReader(stream));
if (queueTotalSize) {
assert(!readableStreamGetNumReadRequests(stream));
const {
buffer,
byteOffset,
byteLength,
} = ArrayPrototypeShift(queue);
controller[kState].queueTotalSize -= byteLength;
readableByteStreamControllerHandleQueueDrain(controller);
const view = new Uint8Array(buffer, byteOffset, byteLength);
readRequest[kChunk](view);
readableByteStreamControllerFillReadRequestFromQueue(
controller,
readRequest
);
return;
}
const {
Expand Down
11 changes: 0 additions & 11 deletions test/wpt/status/streams.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@
]
}
},
"readable-byte-streams/general.any.js": {
"fail": {
"expected": [
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
]
}
},
"readable-streams/cross-realm-crash.window.js": {
"skip": "Browser-specific test"
},
Expand Down

0 comments on commit f300f19

Please sign in to comment.