Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c206853

Browse filesBrowse files
debadree25MylesBorins
authored andcommitted
stream: add pipeline() for webstreams
Refs: #39316 PR-URL: #46307 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 4cf4b41 commit c206853
Copy full SHA for c206853

File tree

Expand file treeCollapse file tree

5 files changed

+500
-10
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+500
-10
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+8-4Lines changed: 8 additions & 4 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2696,6 +2696,9 @@ const cleanup = finished(rs, (err) => {
26962696
<!-- YAML
26972697
added: v10.0.0
26982698
changes:
2699+
- version: REPLACEME
2700+
pr-url: https://github.com/nodejs/node/pull/46307
2701+
description: Added support for webstreams.
26992702
- version: v18.0.0
27002703
pr-url: https://github.com/nodejs/node/pull/41678
27012704
description: Passing an invalid callback to the `callback` argument
@@ -2712,13 +2715,14 @@ changes:
27122715
description: Add support for async generators.
27132716
-->
27142717

2715-
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
2716-
* `source` {Stream|Iterable|AsyncIterable|Function}
2718+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2719+
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
2720+
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
27172721
* Returns: {Iterable|AsyncIterable}
2718-
* `...transforms` {Stream|Function}
2722+
* `...transforms` {Stream|Function|TransformStream}
27192723
* `source` {AsyncIterable}
27202724
* Returns: {AsyncIterable}
2721-
* `destination` {Stream|Function}
2725+
* `destination` {Stream|Function|WritableStream}
27222726
* `source` {AsyncIterable}
27232727
* Returns: {AsyncIterable|Promise}
27242728
* `callback` {Function} Called when the pipeline is fully done.
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+63-5Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ const {
3535
isReadable,
3636
isReadableNodeStream,
3737
isNodeStream,
38+
isTransformStream,
39+
isWebStream,
40+
isReadableStream,
3841
} = require('internal/streams/utils');
3942
const { AbortController } = require('internal/abort_controller');
4043

@@ -88,7 +91,7 @@ async function* fromReadable(val) {
8891
yield* Readable.prototype[SymbolAsyncIterator].call(val);
8992
}
9093

91-
async function pump(iterable, writable, finish, { end }) {
94+
async function pumpToNode(iterable, writable, finish, { end }) {
9295
let error;
9396
let onresolve = null;
9497

@@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
147150
}
148151
}
149152

153+
async function pumpToWeb(readable, writable, finish, { end }) {
154+
if (isTransformStream(writable)) {
155+
writable = writable.writable;
156+
}
157+
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
158+
const writer = writable.getWriter();
159+
try {
160+
for await (const chunk of readable) {
161+
await writer.ready;
162+
writer.write(chunk).catch(() => {});
163+
}
164+
165+
await writer.ready;
166+
167+
if (end) {
168+
await writer.close();
169+
}
170+
171+
finish();
172+
} catch (err) {
173+
try {
174+
await writer.abort(err);
175+
finish(err);
176+
} catch (err) {
177+
finish(err);
178+
}
179+
}
180+
}
181+
150182
function pipeline(...streams) {
151183
return pipelineImpl(streams, once(popCallback(streams)));
152184
}
@@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
259291
ret = Duplex.from(stream);
260292
}
261293
} else if (typeof stream === 'function') {
262-
ret = makeAsyncIterable(ret);
294+
if (isTransformStream(ret)) {
295+
ret = makeAsyncIterable(ret?.readable);
296+
} else {
297+
ret = makeAsyncIterable(ret);
298+
}
263299
ret = stream(ret, { signal });
264300

265301
if (reading) {
@@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
303339
);
304340
} else if (isIterable(ret, true)) {
305341
finishCount++;
306-
pump(ret, pt, finish, { end });
342+
pumpToNode(ret, pt, finish, { end });
343+
} else if (isReadableStream(ret) || isTransformStream(ret)) {
344+
const toRead = ret.readable || ret;
345+
finishCount++;
346+
pumpToNode(toRead, pt, finish, { end });
307347
} else {
308348
throw new ERR_INVALID_RETURN_VALUE(
309349
'AsyncIterable or Promise', 'destination', ret);
@@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
324364
if (isReadable(stream) && isLastStream) {
325365
lastStreamCleanup.push(cleanup);
326366
}
367+
} else if (isTransformStream(ret) || isReadableStream(ret)) {
368+
const toRead = ret.readable || ret;
369+
finishCount++;
370+
pumpToNode(toRead, stream, finish, { end });
327371
} else if (isIterable(ret)) {
328372
finishCount++;
329-
pump(ret, stream, finish, { end });
373+
pumpToNode(ret, stream, finish, { end });
374+
} else {
375+
throw new ERR_INVALID_ARG_TYPE(
376+
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
377+
}
378+
ret = stream;
379+
} else if (isWebStream(stream)) {
380+
if (isReadableNodeStream(ret)) {
381+
finishCount++;
382+
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
383+
} else if (isReadableStream(ret) || isIterable(ret)) {
384+
finishCount++;
385+
pumpToWeb(ret, stream, finish, { end });
386+
} else if (isTransformStream(ret)) {
387+
pumpToWeb(ret.readable, stream, finish, { end });
330388
} else {
331389
throw new ERR_INVALID_ARG_TYPE(
332-
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
390+
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
333391
}
334392
ret = stream;
335393
} else {
Collapse file

‎lib/internal/streams/utils.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/utils.js
+15Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ function isWritableStream(obj) {
7777
);
7878
}
7979

80+
function isTransformStream(obj) {
81+
return !!(
82+
obj &&
83+
!isNodeStream(obj) &&
84+
typeof obj.readable === 'object' &&
85+
typeof obj.writable === 'object'
86+
);
87+
}
88+
89+
function isWebStream(obj) {
90+
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
91+
}
92+
8093
function isIterable(obj, isAsync) {
8194
if (obj == null) return false;
8295
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -303,6 +316,7 @@ module.exports = {
303316
isReadableFinished,
304317
isReadableErrored,
305318
isNodeStream,
319+
isWebStream,
306320
isWritable,
307321
isWritableNodeStream,
308322
isWritableStream,
@@ -312,4 +326,5 @@ module.exports = {
312326
isServerRequest,
313327
isServerResponse,
314328
willEmitClose,
329+
isTransformStream,
315330
};
Collapse file

‎lib/stream/promises.js‎

Copy file name to clipboardExpand all lines: lib/stream/promises.js
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
const {
99
isIterable,
1010
isNodeStream,
11+
isWebStream,
1112
} = require('internal/streams/utils');
1213

1314
const { pipelineImpl: pl } = require('internal/streams/pipeline');
@@ -21,7 +22,7 @@ function pipeline(...streams) {
2122
let end;
2223
const lastArg = streams[streams.length - 1];
2324
if (lastArg && typeof lastArg === 'object' &&
24-
!isNodeStream(lastArg) && !isIterable(lastArg)) {
25+
!isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
2526
const options = ArrayPrototypePop(streams);
2627
signal = options.signal;
2728
end = options.end;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.