Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a057510

Browse filesBrowse files
Warkanlockdanielleadams
authored andcommitted
stream: initial approach to include strategy options on Readable.toWeb()
PR-URL: #43515 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent f32aec8 commit a057510
Copy full SHA for a057510

File tree

Expand file treeCollapse file tree

4 files changed

+109
-12
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+109
-12
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+5-1Lines changed: 5 additions & 1 deletion
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2789,7 +2789,7 @@ added:
27892789

27902790
Returns whether the stream is readable.
27912791

2792-
### `stream.Readable.toWeb(streamReadable)`
2792+
### `stream.Readable.toWeb(streamReadable[, options])`
27932793

27942794
<!-- YAML
27952795
added: v17.0.0
@@ -2798,6 +2798,10 @@ added: v17.0.0
27982798
> Stability: 1 - Experimental
27992799
28002800
* `streamReadable` {stream.Readable}
2801+
* `options` {Object}
2802+
* `strategy` {Object}
2803+
* `highWaterMark` {number}
2804+
* `size` {Function}
28012805
* Returns: {ReadableStream}
28022806

28032807
### `stream.Writable.fromWeb(writableStream[, options])`
Collapse file

‎lib/internal/streams/readable.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/readable.js
+4-2Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
14051405
options);
14061406
};
14071407

1408-
Readable.toWeb = function(streamReadable) {
1409-
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
1408+
Readable.toWeb = function(streamReadable, options) {
1409+
return lazyWebStreams().newReadableStreamFromStreamReadable(
1410+
streamReadable,
1411+
options);
14101412
};
14111413

14121414
Readable.wrap = function(src, options) {
Collapse file

‎lib/internal/webstreams/adapters.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/adapters.js
+25-9Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
359359
}
360360

361361
/**
362+
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
362363
* @param {Readable} streamReadable
364+
* @param {{
365+
* strategy : QueuingStrategy
366+
* }} [options]
363367
* @returns {ReadableStream}
364368
*/
365-
function newReadableStreamFromStreamReadable(streamReadable) {
369+
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
366370
// Not using the internal/streams/utils isReadableNodeStream utility
367371
// here because it will return false if streamReadable is a Duplex
368372
// whose readable option is false. For a Duplex that is not readable,
@@ -382,14 +386,26 @@ function newReadableStreamFromStreamReadable(streamReadable) {
382386

383387
const objectMode = streamReadable.readableObjectMode;
384388
const highWaterMark = streamReadable.readableHighWaterMark;
385-
// When not running in objectMode explicitly, we just fall
386-
// back to a minimal strategy that just specifies the highWaterMark
387-
// and no size algorithm. Using a ByteLengthQueuingStrategy here
388-
// is unnecessary.
389-
const strategy =
390-
objectMode ?
391-
new CountQueuingStrategy({ highWaterMark }) :
392-
{ highWaterMark };
389+
390+
const evaluateStrategyOrFallback = (strategy) => {
391+
// If there is a strategy available, use it
392+
if (strategy)
393+
return strategy;
394+
395+
if (objectMode) {
396+
// When running in objectMode explicitly but no strategy, we just fall
397+
// back to CountQueuingStrategy
398+
return new CountQueuingStrategy({ highWaterMark });
399+
}
400+
401+
// When not running in objectMode explicitly, we just fall
402+
// back to a minimal strategy that just specifies the highWaterMark
403+
// and no size algorithm. Using a ByteLengthQueuingStrategy here
404+
// is unnecessary.
405+
return { highWaterMark };
406+
};
407+
408+
const strategy = evaluateStrategyOrFallback(options?.strategy);
393409

394410
let controller;
395411

Collapse file
+75Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Readable } = require('stream');
4+
const assert = require('assert');
5+
const { strictEqual } = require('assert');
6+
7+
{
8+
// Strategy 2
9+
const streamData = ['a', 'b', 'c', null];
10+
11+
// Fulfill a Readable object
12+
const readable = new Readable({
13+
read: common.mustCall(() => {
14+
process.nextTick(() => {
15+
readable.push(streamData.shift());
16+
});
17+
}, streamData.length),
18+
});
19+
20+
// Use helper to convert it to a Web ReadableStream using ByteLength strategy
21+
const readableStream = Readable.toWeb(readable, {
22+
strategy: new ByteLengthQueuingStrategy({ highWaterMark: 1 }),
23+
});
24+
25+
assert(!readableStream.locked);
26+
readableStream.getReader().read().then(common.mustCall());
27+
}
28+
29+
{
30+
// Strategy 2
31+
const streamData = ['a', 'b', 'c', null];
32+
33+
// Fulfill a Readable object
34+
const readable = new Readable({
35+
read: common.mustCall(() => {
36+
process.nextTick(() => {
37+
readable.push(streamData.shift());
38+
});
39+
}, streamData.length),
40+
});
41+
42+
// Use helper to convert it to a Web ReadableStream using Count strategy
43+
const readableStream = Readable.toWeb(readable, {
44+
strategy: new CountQueuingStrategy({ highWaterMark: 1 }),
45+
});
46+
47+
assert(!readableStream.locked);
48+
readableStream.getReader().read().then(common.mustCall());
49+
}
50+
51+
{
52+
const desireSizeExpected = 2;
53+
54+
const stringStream = new ReadableStream(
55+
{
56+
start(controller) {
57+
// Check if the strategy is being assigned on the init of the ReadableStream
58+
strictEqual(controller.desiredSize, desireSizeExpected);
59+
controller.enqueue('a');
60+
controller.enqueue('b');
61+
controller.close();
62+
},
63+
},
64+
new CountQueuingStrategy({ highWaterMark: desireSizeExpected })
65+
);
66+
67+
const reader = stringStream.getReader();
68+
69+
reader.read().then(common.mustCall());
70+
reader.read().then(common.mustCall());
71+
reader.read().then(({ value, done }) => {
72+
strictEqual(value, undefined);
73+
strictEqual(done, true);
74+
});
75+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.