Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9ab353a

Browse filesBrowse files
MattiasBuelenstargos
authored andcommitted
stream: implement min option for ReadableStreamBYOBReader.read
PR-URL: #50888 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
1 parent 595542e commit 9ab353a
Copy full SHA for 9ab353a

File tree

Expand file treeCollapse file tree

15 files changed

+968
-114
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

15 files changed

+968
-114
lines changed
Open diff view settings
Collapse file

‎doc/api/webstreams.md‎

Copy file name to clipboardExpand all lines: doc/api/webstreams.md
+12-3Lines changed: 12 additions & 3 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ added: v16.5.0
492492
-->
493493

494494
* Returns: A promise fulfilled with an object:
495-
* `value` {ArrayBuffer}
495+
* `value` {any}
496496
* `done` {boolean}
497497

498498
Requests the next chunk of data from the underlying {ReadableStream}
@@ -617,15 +617,24 @@ added: v16.5.0
617617
{ReadableStream} is closed or rejected if the stream errors or the reader's
618618
lock is released before the stream finishes closing.
619619
620-
#### `readableStreamBYOBReader.read(view)`
620+
#### `readableStreamBYOBReader.read(view[, options])`
621621
622622
<!-- YAML
623623
added: v16.5.0
624+
changes:
625+
- version: REPLACEME
626+
pr-url: https://github.com/nodejs/node/pull/50888
627+
description: Added `min` option.
624628
-->
625629
626630
* `view` {Buffer|TypedArray|DataView}
631+
* `options` {Object}
632+
* `min` {number} When set, the returned promise will only be
633+
fulfilled as soon as `min` number of elements are available.
634+
When not set, the promise fulfills when at least one element
635+
is available.
627636
* Returns: A promise fulfilled with an object:
628-
* `value` {ArrayBuffer}
637+
* `value` {TypedArray|DataView}
629638
* `done` {boolean}
630639
631640
Requests the next chunk of data from the underlying {ReadableStream}
Collapse file

‎lib/internal/encoding.js‎

Copy file name to clipboardExpand all lines: lib/internal/encoding.js
+1-7Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ const {
4747
const {
4848
validateString,
4949
validateObject,
50-
kValidateObjectAllowNullable,
51-
kValidateObjectAllowArray,
52-
kValidateObjectAllowFunction,
50+
kValidateObjectAllowObjectsAndNull,
5351
} = require('internal/validators');
5452
const binding = internalBinding('encoding_binding');
5553
const {
@@ -393,10 +391,6 @@ const TextDecoder =
393391
makeTextDecoderICU() :
394392
makeTextDecoderJS();
395393

396-
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
397-
kValidateObjectAllowArray |
398-
kValidateObjectAllowFunction;
399-
400394
function makeTextDecoderICU() {
401395
const {
402396
decode: _decode,
Collapse file

‎lib/internal/validators.js‎

Copy file name to clipboardExpand all lines: lib/internal/validators.js
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
222222
const kValidateObjectAllowNullable = 1 << 0;
223223
const kValidateObjectAllowArray = 1 << 1;
224224
const kValidateObjectAllowFunction = 1 << 2;
225+
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
226+
kValidateObjectAllowFunction;
227+
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
228+
kValidateObjectAllowArray |
229+
kValidateObjectAllowFunction;
225230

226231
/**
227232
* @callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
583588
kValidateObjectAllowNullable,
584589
kValidateObjectAllowArray,
585590
kValidateObjectAllowFunction,
591+
kValidateObjectAllowObjects,
592+
kValidateObjectAllowObjectsAndNull,
586593
validateOneOf,
587594
validatePlainFunction,
588595
validatePort,
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+67-40Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SymbolAsyncIterator,
2323
SymbolDispose,
2424
SymbolToStringTag,
25+
TypedArrayPrototypeGetLength,
2526
Uint8Array,
2627
} = primordials;
2728

@@ -33,6 +34,7 @@ const {
3334
ERR_INVALID_ARG_TYPE,
3435
ERR_INVALID_STATE,
3536
ERR_INVALID_THIS,
37+
ERR_OUT_OF_RANGE,
3638
},
3739
} = require('internal/errors');
3840

@@ -58,8 +60,8 @@ const {
5860
validateAbortSignal,
5961
validateBuffer,
6062
validateObject,
61-
kValidateObjectAllowNullable,
62-
kValidateObjectAllowFunction,
63+
kValidateObjectAllowObjects,
64+
kValidateObjectAllowObjectsAndNull,
6365
} = require('internal/validators');
6466

6567
const {
@@ -246,10 +248,10 @@ class ReadableStream {
246248
* @param {UnderlyingSource} [source]
247249
* @param {QueuingStrategy} [strategy]
248250
*/
249-
constructor(source = {}, strategy = kEmptyObject) {
251+
constructor(source = kEmptyObject, strategy = kEmptyObject) {
250252
markTransferMode(this, false, true);
251-
if (source === null)
252-
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
253+
validateObject(source, 'source', kValidateObjectAllowObjects);
254+
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
253255
this[kState] = createReadableStreamState();
254256

255257
this[kIsClosedPromise] = createDeferredPromise();
@@ -332,7 +334,7 @@ class ReadableStream {
332334
getReader(options = kEmptyObject) {
333335
if (!isReadableStream(this))
334336
throw new ERR_INVALID_THIS('ReadableStream');
335-
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
337+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
336338
const mode = options?.mode;
337339

338340
if (mode === undefined)
@@ -370,6 +372,7 @@ class ReadableStream {
370372

371373
// The web platform tests require that these be handled one at a
372374
// time and in a specific order. options can be null or undefined.
375+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
373376
const preventAbort = options?.preventAbort;
374377
const preventCancel = options?.preventCancel;
375378
const preventClose = options?.preventClose;
@@ -412,6 +415,7 @@ class ReadableStream {
412415
destination);
413416
}
414417

418+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
415419
const preventAbort = options?.preventAbort;
416420
const preventCancel = options?.preventCancel;
417421
const preventClose = options?.preventClose;
@@ -456,10 +460,8 @@ class ReadableStream {
456460
values(options = kEmptyObject) {
457461
if (!isReadableStream(this))
458462
throw new ERR_INVALID_THIS('ReadableStream');
459-
validateObject(options, 'options');
460-
const {
461-
preventCancel = false,
462-
} = options;
463+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
464+
const preventCancel = !!(options?.preventCancel);
463465

464466
// eslint-disable-next-line no-use-before-define
465467
const reader = new ReadableStreamDefaultReader(this);
@@ -929,47 +931,62 @@ class ReadableStreamBYOBReader {
929931

930932
/**
931933
* @param {ArrayBufferView} view
934+
* @param {{
935+
* min? : number
936+
* }} [options]
932937
* @returns {Promise<{
933-
* view : ArrayBufferView,
938+
* value : ArrayBufferView,
934939
* done : boolean,
935940
* }>}
936941
*/
937-
read(view) {
942+
async read(view, options = kEmptyObject) {
938943
if (!isReadableStreamBYOBReader(this))
939-
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
944+
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
940945
if (!isArrayBufferView(view)) {
941-
return PromiseReject(
942-
new ERR_INVALID_ARG_TYPE(
943-
'view',
944-
[
945-
'Buffer',
946-
'TypedArray',
947-
'DataView',
948-
],
949-
view));
946+
throw new ERR_INVALID_ARG_TYPE(
947+
'view',
948+
[
949+
'Buffer',
950+
'TypedArray',
951+
'DataView',
952+
],
953+
view,
954+
);
950955
}
956+
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
951957

952958
const viewByteLength = ArrayBufferViewGetByteLength(view);
953959
const viewBuffer = ArrayBufferViewGetBuffer(view);
954960
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
955961

956962
if (viewByteLength === 0 || viewBufferByteLength === 0) {
957-
return PromiseReject(
958-
new ERR_INVALID_STATE.TypeError(
959-
'View or Viewed ArrayBuffer is zero-length or detached',
960-
),
961-
);
963+
throw new ERR_INVALID_STATE.TypeError(
964+
'View or Viewed ArrayBuffer is zero-length or detached');
962965
}
963966

964967
// Supposed to assert here that the view's buffer is not
965968
// detached, but there's no API available to use to check that.
969+
970+
const min = options?.min ?? 1;
971+
if (typeof min !== 'number')
972+
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
973+
if (!NumberIsInteger(min))
974+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
975+
if (min <= 0)
976+
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
977+
if (!isDataView(view)) {
978+
if (min > TypedArrayPrototypeGetLength(view)) {
979+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
980+
}
981+
} else if (min > viewByteLength) {
982+
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
983+
}
984+
966985
if (this[kState].stream === undefined) {
967-
return PromiseReject(
968-
new ERR_INVALID_STATE.TypeError(
969-
'The reader is not attached to a stream'));
986+
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
970987
}
971988
const readIntoRequest = new ReadIntoRequest();
972-
readableStreamBYOBReaderRead(this, view, readIntoRequest);
989+
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
973990
return readIntoRequest.promise;
974991
}
975992

@@ -1883,7 +1900,7 @@ function readableByteStreamTee(stream) {
18831900
reading = false;
18841901
},
18851902
};
1886-
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1903+
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
18871904
}
18881905

18891906
function pull1Algorithm() {
@@ -2210,7 +2227,7 @@ function readableStreamReaderGenericRelease(reader) {
22102227
reader[kState].stream = undefined;
22112228
}
22122229

2213-
function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2230+
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
22142231
const {
22152232
stream,
22162233
} = reader[kState];
@@ -2223,6 +2240,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22232240
readableByteStreamControllerPullInto(
22242241
stream[kState].controller,
22252242
view,
2243+
min,
22262244
readIntoRequest);
22272245
}
22282246

@@ -2495,7 +2513,7 @@ function readableByteStreamControllerClose(controller) {
24952513

24962514
if (pendingPullIntos.length) {
24972515
const firstPendingPullInto = pendingPullIntos[0];
2498-
if (firstPendingPullInto.bytesFilled > 0) {
2516+
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
24992517
const error = new ERR_INVALID_STATE.TypeError('Partial read');
25002518
readableByteStreamControllerError(controller, error);
25012519
throw error;
@@ -2512,7 +2530,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25122530

25132531
let done = false;
25142532
if (stream[kState].state === 'closed') {
2515-
desc.bytesFilled = 0;
2533+
assert(desc.bytesFilled % desc.elementSize === 0);
25162534
done = true;
25172535
}
25182536

@@ -2601,6 +2619,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
26012619
function readableByteStreamControllerPullInto(
26022620
controller,
26032621
view,
2622+
min,
26042623
readIntoRequest) {
26052624
const {
26062625
closeRequested,
@@ -2613,6 +2632,11 @@ function readableByteStreamControllerPullInto(
26132632
elementSize = view.constructor.BYTES_PER_ELEMENT;
26142633
ctor = view.constructor;
26152634
}
2635+
2636+
const minimumFill = min * elementSize;
2637+
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
2638+
assert(minimumFill % elementSize === 0);
2639+
26162640
const buffer = ArrayBufferViewGetBuffer(view);
26172641
const byteOffset = ArrayBufferViewGetByteOffset(view);
26182642
const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2631,6 +2655,7 @@ function readableByteStreamControllerPullInto(
26312655
byteOffset,
26322656
byteLength,
26332657
bytesFilled: 0,
2658+
minimumFill,
26342659
elementSize,
26352660
ctor,
26362661
type: 'byob',
@@ -2718,7 +2743,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27182743
}
27192744

27202745
function readableByteStreamControllerRespondInClosedState(controller, desc) {
2721-
assert(!desc.bytesFilled);
2746+
assert(desc.bytesFilled % desc.elementSize === 0);
27222747
if (desc.type === 'none') {
27232748
readableByteStreamControllerShiftPendingPullInto(controller);
27242749
}
@@ -2895,17 +2920,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28952920
byteLength,
28962921
byteOffset,
28972922
bytesFilled,
2923+
minimumFill,
28982924
elementSize,
28992925
} = desc;
2900-
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
29012926
const maxBytesToCopy = MathMin(
29022927
controller[kState].queueTotalSize,
29032928
byteLength - bytesFilled);
29042929
const maxBytesFilled = bytesFilled + maxBytesToCopy;
29052930
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29062931
let totalBytesToCopyRemaining = maxBytesToCopy;
29072932
let ready = false;
2908-
if (maxAlignedBytes > currentAlignedBytes) {
2933+
assert(bytesFilled < minimumFill);
2934+
if (maxAlignedBytes >= minimumFill) {
29092935
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
29102936
ready = true;
29112937
}
@@ -2948,7 +2974,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29482974
if (!ready) {
29492975
assert(!controller[kState].queueTotalSize);
29502976
assert(desc.bytesFilled > 0);
2951-
assert(desc.bytesFilled < elementSize);
2977+
assert(desc.bytesFilled < minimumFill);
29522978
}
29532979
return ready;
29542980
}
@@ -3004,7 +3030,7 @@ function readableByteStreamControllerRespondInReadableState(
30043030
return;
30053031
}
30063032

3007-
if (desc.bytesFilled < desc.elementSize)
3033+
if (desc.bytesFilled < desc.minimumFill)
30083034
return;
30093035

30103036
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3189,6 +3215,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31893215
byteOffset: 0,
31903216
byteLength: autoAllocateChunkSize,
31913217
bytesFilled: 0,
3218+
minimumFill: 1,
31923219
elementSize: 1,
31933220
ctor: Uint8Array,
31943221
type: 'default',
Collapse file

‎lib/internal/webstreams/transformstream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/transformstream.js
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const {
2929
kEnumerableProperty,
3030
} = require('internal/util');
3131

32+
const {
33+
validateObject,
34+
kValidateObjectAllowObjects,
35+
kValidateObjectAllowObjectsAndNull,
36+
} = require('internal/validators');
37+
3238
const {
3339
kDeserialize,
3440
kTransfer,
@@ -119,10 +125,13 @@ class TransformStream {
119125
* @param {QueuingStrategy} [readableStrategy]
120126
*/
121127
constructor(
122-
transformer = null,
128+
transformer = kEmptyObject,
123129
writableStrategy = kEmptyObject,
124130
readableStrategy = kEmptyObject) {
125131
markTransferMode(this, false, true);
132+
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
133+
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
134+
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
126135
const readableType = transformer?.readableType;
127136
const writableType = transformer?.writableType;
128137
const start = transformer?.start;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.