Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f886681

Browse filesBrowse files
debadree25targos
authored andcommitted
stream: enable usage of webstreams on compose()
Refs: #39316 PR-URL: #46675 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 4e18651 commit f886681
Copy full SHA for f886681

File tree

Expand file treeCollapse file tree

4 files changed

+619
-52
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+619
-52
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+6-1Lines changed: 6 additions & 1 deletion
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2796,11 +2796,16 @@ const server = http.createServer((req, res) => {
27962796

27972797
<!-- YAML
27982798
added: v16.9.0
2799+
changes:
2800+
- version: REPLACEME
2801+
pr-url: https://github.com/nodejs/node/pull/46675
2802+
description: Added support for webstreams.
27992803
-->
28002804

28012805
> Stability: 1 - `stream.compose` is experimental.
28022806
2803-
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
2807+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2808+
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
28042809
* Returns: {stream.Duplex}
28052810

28062811
Combines two or more streams into a `Duplex` stream that writes to the
Collapse file

‎lib/internal/streams/compose.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/compose.js
+128-50Lines changed: 128 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ const {
77
isNodeStream,
88
isReadable,
99
isWritable,
10+
isWebStream,
11+
isTransformStream,
12+
isWritableStream,
13+
isReadableStream,
1014
} = require('internal/streams/utils');
1115
const {
1216
AbortError,
@@ -15,6 +19,7 @@ const {
1519
ERR_MISSING_ARGS,
1620
},
1721
} = require('internal/errors');
22+
const eos = require('internal/streams/end-of-stream');
1823

1924
module.exports = function compose(...streams) {
2025
if (streams.length === 0) {
@@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
3742
}
3843

3944
for (let n = 0; n < streams.length; ++n) {
40-
if (!isNodeStream(streams[n])) {
45+
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
4146
// TODO(ronag): Add checks for non streams.
4247
continue;
4348
}
44-
if (n < streams.length - 1 && !isReadable(streams[n])) {
49+
if (
50+
n < streams.length - 1 &&
51+
!(
52+
isReadable(streams[n]) ||
53+
isReadableStream(streams[n]) ||
54+
isTransformStream(streams[n])
55+
)
56+
) {
4557
throw new ERR_INVALID_ARG_VALUE(
4658
`streams[${n}]`,
4759
orgStreams[n],
4860
'must be readable',
4961
);
5062
}
51-
if (n > 0 && !isWritable(streams[n])) {
63+
if (
64+
n > 0 &&
65+
!(
66+
isWritable(streams[n]) ||
67+
isWritableStream(streams[n]) ||
68+
isTransformStream(streams[n])
69+
)
70+
) {
5271
throw new ERR_INVALID_ARG_VALUE(
5372
`streams[${n}]`,
5473
orgStreams[n],
@@ -79,8 +98,16 @@ module.exports = function compose(...streams) {
7998
const head = streams[0];
8099
const tail = pipeline(streams, onfinished);
81100

82-
const writable = !!isWritable(head);
83-
const readable = !!isReadable(tail);
101+
const writable = !!(
102+
isWritable(head) ||
103+
isWritableStream(head) ||
104+
isTransformStream(head)
105+
);
106+
const readable = !!(
107+
isReadable(tail) ||
108+
isReadableStream(tail) ||
109+
isTransformStream(tail)
110+
);
84111

85112
// TODO(ronag): Avoid double buffering.
86113
// Implement Writable/Readable/Duplex traits.
@@ -94,28 +121,55 @@ module.exports = function compose(...streams) {
94121
});
95122

96123
if (writable) {
97-
d._write = function(chunk, encoding, callback) {
98-
if (head.write(chunk, encoding)) {
99-
callback();
100-
} else {
101-
ondrain = callback;
102-
}
103-
};
104-
105-
d._final = function(callback) {
106-
head.end();
107-
onfinish = callback;
108-
};
124+
if (isNodeStream(head)) {
125+
d._write = function(chunk, encoding, callback) {
126+
if (head.write(chunk, encoding)) {
127+
callback();
128+
} else {
129+
ondrain = callback;
130+
}
131+
};
132+
133+
d._final = function(callback) {
134+
head.end();
135+
onfinish = callback;
136+
};
137+
138+
head.on('drain', function() {
139+
if (ondrain) {
140+
const cb = ondrain;
141+
ondrain = null;
142+
cb();
143+
}
144+
});
145+
} else if (isWebStream(head)) {
146+
const writable = isTransformStream(head) ? head.writable : head;
147+
const writer = writable.getWriter();
148+
149+
d._write = async function(chunk, encoding, callback) {
150+
try {
151+
await writer.ready;
152+
writer.write(chunk).catch(() => {});
153+
callback();
154+
} catch (err) {
155+
callback(err);
156+
}
157+
};
158+
159+
d._final = async function(callback) {
160+
try {
161+
await writer.ready;
162+
writer.close().catch(() => {});
163+
onfinish = callback;
164+
} catch (err) {
165+
callback(err);
166+
}
167+
};
168+
}
109169

110-
head.on('drain', function() {
111-
if (ondrain) {
112-
const cb = ondrain;
113-
ondrain = null;
114-
cb();
115-
}
116-
});
170+
const toRead = isTransformStream(tail) ? tail.readable : tail;
117171

118-
tail.on('finish', function() {
172+
eos(toRead, () => {
119173
if (onfinish) {
120174
const cb = onfinish;
121175
onfinish = null;
@@ -125,32 +179,54 @@ module.exports = function compose(...streams) {
125179
}
126180

127181
if (readable) {
128-
tail.on('readable', function() {
129-
if (onreadable) {
130-
const cb = onreadable;
131-
onreadable = null;
132-
cb();
133-
}
134-
});
135-
136-
tail.on('end', function() {
137-
d.push(null);
138-
});
139-
140-
d._read = function() {
141-
while (true) {
142-
const buf = tail.read();
143-
144-
if (buf === null) {
145-
onreadable = d._read;
146-
return;
182+
if (isNodeStream(tail)) {
183+
tail.on('readable', function() {
184+
if (onreadable) {
185+
const cb = onreadable;
186+
onreadable = null;
187+
cb();
147188
}
148-
149-
if (!d.push(buf)) {
150-
return;
189+
});
190+
191+
tail.on('end', function() {
192+
d.push(null);
193+
});
194+
195+
d._read = function() {
196+
while (true) {
197+
const buf = tail.read();
198+
if (buf === null) {
199+
onreadable = d._read;
200+
return;
201+
}
202+
203+
if (!d.push(buf)) {
204+
return;
205+
}
151206
}
152-
}
153-
};
207+
};
208+
} else if (isWebStream(tail)) {
209+
const readable = isTransformStream(tail) ? tail.readable : tail;
210+
const reader = readable.getReader();
211+
d._read = async function() {
212+
while (true) {
213+
try {
214+
const { value, done } = await reader.read();
215+
216+
if (!d.push(value)) {
217+
return;
218+
}
219+
220+
if (done) {
221+
d.push(null);
222+
return;
223+
}
224+
} catch {
225+
return;
226+
}
227+
}
228+
};
229+
}
154230
}
155231

156232
d._destroy = function(err, callback) {
@@ -166,7 +242,9 @@ module.exports = function compose(...streams) {
166242
callback(err);
167243
} else {
168244
onclose = callback;
169-
destroyer(tail, err);
245+
if (isNodeStream(tail)) {
246+
destroyer(tail, err);
247+
}
170248
}
171249
};
172250

Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ function pipelineImpl(streams, callback, opts) {
286286
throw new ERR_INVALID_RETURN_VALUE(
287287
'Iterable, AsyncIterable or Stream', 'source', ret);
288288
}
289-
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
289+
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
290290
ret = stream;
291291
} else {
292292
ret = Duplex.from(stream);
@@ -385,6 +385,7 @@ function pipelineImpl(streams, callback, opts) {
385385
finishCount++;
386386
pumpToWeb(ret, stream, finish, { end });
387387
} else if (isTransformStream(ret)) {
388+
finishCount++;
388389
pumpToWeb(ret.readable, stream, finish, { end });
389390
} else {
390391
throw new ERR_INVALID_ARG_TYPE(

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.