Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 36d3b12

Browse filesBrowse files
Mesteerytargos
authored andcommitted
stream: support array of streams in promises pipeline
Fixes: #40191 PR-URL: #40193 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 1743306 commit 36d3b12
Copy full SHA for 36d3b12

File tree

Expand file treeCollapse file tree

2 files changed

+45
-7
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+45
-7
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+4-7Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) {
166166
}
167167

168168
function pipeline(...streams) {
169-
const callback = once(popCallback(streams));
169+
return pipelineImpl(streams, once(popCallback(streams)));
170+
}
170171

171-
// stream.pipeline(streams, callback)
172-
if (ArrayIsArray(streams[0]) && streams.length === 1) {
172+
function pipelineImpl(streams, callback, opts) {
173+
if (streams.length === 1 && ArrayIsArray(streams[0])) {
173174
streams = streams[0];
174175
}
175176

176-
return pipelineImpl(streams, callback);
177-
}
178-
179-
function pipelineImpl(streams, callback, opts) {
180177
if (streams.length < 2) {
181178
throw new ERR_MISSING_ARGS('streams');
182179
}
Collapse file

‎test/parallel/test-stream-pipeline.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-pipeline.js
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,3 +1406,44 @@ const tsp = require('timers/promises');
14061406
}));
14071407
ac.abort();
14081408
}
1409+
1410+
{
1411+
async function run() {
1412+
let finished = false;
1413+
let text = '';
1414+
const write = new Writable({
1415+
write(data, enc, cb) {
1416+
text += data;
1417+
cb();
1418+
}
1419+
});
1420+
write.on('finish', () => {
1421+
finished = true;
1422+
});
1423+
1424+
await pipelinep([Readable.from('Hello World!'), write]);
1425+
assert(finished);
1426+
assert.strictEqual(text, 'Hello World!');
1427+
}
1428+
1429+
run();
1430+
}
1431+
1432+
{
1433+
let finished = false;
1434+
let text = '';
1435+
const write = new Writable({
1436+
write(data, enc, cb) {
1437+
text += data;
1438+
cb();
1439+
}
1440+
});
1441+
write.on('finish', () => {
1442+
finished = true;
1443+
});
1444+
1445+
pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => {
1446+
assert(finished);
1447+
assert.strictEqual(text, 'Hello World!');
1448+
}));
1449+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.