Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ce71929

Browse filesBrowse files
daeyeonRafaelGSS
authored andcommitted
stream: handle a pending pull request from a released reader
In order to meet the specification, this includes mainly the followings: - Adding the 'release steps' to ReadableStreamController - Responding to a pull request from a released reader in ReadableByteStreamController Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: #44702 Refs: https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
1 parent e353bf7 commit ce71929
Copy full SHA for ce71929

File tree

Expand file treeCollapse file tree

3 files changed

+86
-8
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+86
-8
lines changed
Open diff view settings
Collapse file

‎doc/api/webstreams.md‎

Copy file name to clipboardExpand all lines: doc/api/webstreams.md
+4Lines changed: 4 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,10 @@ Signals an error that causes the {ReadableStream} to error and close.
651651
652652
<!-- YAML
653653
added: v16.5.0
654+
changes:
655+
- version: REPLACEME
656+
pr-url: https://github.com/nodejs/node/pull/44702
657+
description: Support handling a BYOB pull request from a released reader.
654658
-->
655659
656660
Every {ReadableStream} has a controller that is responsible for
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+82Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ const kClose = Symbol('kClose');
139139
const kChunk = Symbol('kChunk');
140140
const kError = Symbol('kError');
141141
const kPull = Symbol('kPull');
142+
const kRelease = Symbol('kRelease');
142143

143144
/**
144145
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
@@ -1019,6 +1020,8 @@ class ReadableStreamDefaultController {
10191020
readableStreamDefaultControllerPullSteps(this, readRequest);
10201021
}
10211022

1023+
[kRelease]() {}
1024+
10221025
[kInspect](depth, options) {
10231026
return customInspect(depth, options, this[kType], { });
10241027
}
@@ -1143,6 +1146,17 @@ class ReadableByteStreamController {
11431146
readableByteStreamControllerPullSteps(this, readRequest);
11441147
}
11451148

1149+
[kRelease]() {
1150+
const {
1151+
pendingPullIntos,
1152+
} = this[kState];
1153+
if (pendingPullIntos.length > 0) {
1154+
const firstPendingPullInto = pendingPullIntos[0];
1155+
firstPendingPullInto.type = 'none';
1156+
this[kState].pendingPullIntos = [firstPendingPullInto];
1157+
}
1158+
}
1159+
11461160
[kInspect](depth, options) {
11471161
return customInspect(depth, options, this[kType], { });
11481162
}
@@ -2060,6 +2074,9 @@ function readableStreamReaderGenericRelease(reader) {
20602074
};
20612075
}
20622076
setPromiseHandled(reader[kState].close.promise);
2077+
2078+
stream[kState].controller[kRelease]();
2079+
20632080
stream[kState].reader = undefined;
20642081
reader[kState].stream = undefined;
20652082
}
@@ -2365,6 +2382,8 @@ function readableByteStreamControllerClose(controller) {
23652382

23662383
function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
23672384
assert(stream[kState].state !== 'errored');
2385+
assert(desc.type !== 'none');
2386+
23682387
let done = false;
23692388
if (stream[kState].state === 'closed') {
23702389
desc.bytesFilled = 0;
@@ -2574,6 +2593,9 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
25742593

25752594
function readableByteStreamControllerRespondInClosedState(controller, desc) {
25762595
assert(!desc.bytesFilled);
2596+
if (desc.type === 'none') {
2597+
readableByteStreamControllerShiftPendingPullInto(controller);
2598+
}
25772599
const {
25782600
stream,
25792601
} = controller[kState];
@@ -2663,6 +2685,31 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
26632685
readableByteStreamControllerCallPullIfNeeded(controller);
26642686
}
26652687

2688+
function readableByteStreamControllerEnqueueClonedChunkToQueue(
2689+
controller,
2690+
buffer,
2691+
byteOffset,
2692+
byteLength
2693+
) {
2694+
let cloneResult;
2695+
try {
2696+
cloneResult = ArrayBufferPrototypeSlice(
2697+
buffer,
2698+
byteOffset,
2699+
byteOffset + byteLength
2700+
);
2701+
} catch (error) {
2702+
readableByteStreamControllerError(controller, error);
2703+
throw error;
2704+
}
2705+
readableByteStreamControllerEnqueueChunkToQueue(
2706+
controller,
2707+
cloneResult,
2708+
0,
2709+
byteLength
2710+
);
2711+
}
2712+
26662713
function readableByteStreamControllerEnqueueChunkToQueue(
26672714
controller,
26682715
buffer,
@@ -2678,6 +2725,29 @@ function readableByteStreamControllerEnqueueChunkToQueue(
26782725
controller[kState].queueTotalSize += byteLength;
26792726
}
26802727

2728+
function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2729+
controller,
2730+
desc
2731+
) {
2732+
const {
2733+
buffer,
2734+
byteOffset,
2735+
bytesFilled,
2736+
type,
2737+
} = desc;
2738+
assert(type === 'none');
2739+
2740+
if (bytesFilled > 0) {
2741+
readableByteStreamControllerEnqueueClonedChunkToQueue(
2742+
controller,
2743+
buffer,
2744+
byteOffset,
2745+
bytesFilled
2746+
);
2747+
}
2748+
readableByteStreamControllerShiftPendingPullInto(controller);
2749+
}
2750+
26812751
function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
26822752
controller,
26832753
desc) {
@@ -2773,6 +2843,7 @@ function readableByteStreamControllerRespondInReadableState(
27732843
buffer,
27742844
bytesFilled,
27752845
byteLength,
2846+
type,
27762847
} = desc;
27772848

27782849
if (bytesFilled + bytesWritten > byteLength)
@@ -2783,6 +2854,17 @@ function readableByteStreamControllerRespondInReadableState(
27832854
bytesWritten,
27842855
desc);
27852856

2857+
if (type === 'none') {
2858+
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
2859+
controller,
2860+
desc
2861+
);
2862+
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2863+
controller
2864+
);
2865+
return;
2866+
}
2867+
27862868
if (desc.bytesFilled < desc.elementSize)
27872869
return;
27882870

Collapse file

‎test/wpt/status/streams.json‎

Copy file name to clipboardExpand all lines: test/wpt/status/streams.json
-8Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,9 @@
1616
"fail": {
1717
"expected": [
1818
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
19-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
20-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
21-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
22-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
2319
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
24-
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
25-
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
2620
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
27-
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
2821
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
29-
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
3022
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
3123
]
3224
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.