Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 650c9bd

Browse filesBrowse files
ronagtargos
authored andcommitted
stream: pipeline with end option
Currently pipeline cannot fully replace pipe due to the missing end option. This PR adds the end option to the promisified pipeline method. PR-URL: #40886 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 534409d commit 650c9bd
Copy full SHA for 650c9bd

File tree

Expand file treeCollapse file tree

3 files changed

+50
-17
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+50
-17
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+25-15Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async function* fromReadable(val) {
109109
yield* Readable.prototype[SymbolAsyncIterator].call(val);
110110
}
111111

112-
async function pump(iterable, writable, finish) {
112+
async function pump(iterable, writable, finish, opts) {
113113
let error;
114114
let onresolve = null;
115115

@@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
153153
}
154154
}
155155

156-
writable.end();
156+
if (opts?.end !== false) {
157+
writable.end();
158+
}
157159

158160
await wait();
159161

@@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
227229
const stream = streams[i];
228230
const reading = i < streams.length - 1;
229231
const writing = i > 0;
232+
const end = reading || opts?.end !== false;
230233

231234
if (isNodeStream(stream)) {
232-
finishCount++;
233-
destroys.push(destroyer(stream, reading, writing, (err) => {
234-
if (!err && !reading && isReadableFinished(stream, false)) {
235-
stream.read(0);
236-
destroyer(stream, true, writing, finish);
237-
} else {
238-
finish(err);
239-
}
240-
}));
235+
if (end) {
236+
finishCount++;
237+
destroys.push(destroyer(stream, reading, writing, (err) => {
238+
if (!err && !reading && isReadableFinished(stream, false)) {
239+
stream.read(0);
240+
destroyer(stream, true, writing, finish);
241+
} else {
242+
finish(err);
243+
}
244+
}));
245+
} else {
246+
stream.on('error', finish);
247+
}
241248
}
242249

243250
if (i === 0) {
@@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
282289
then.call(ret,
283290
(val) => {
284291
value = val;
285-
pt.end(val);
292+
pt.write(val);
293+
if (end) {
294+
pt.end();
295+
}
286296
}, (err) => {
287297
pt.destroy(err);
288298
},
289299
);
290300
} else if (isIterable(ret, true)) {
291301
finishCount++;
292-
pump(ret, pt, finish);
302+
pump(ret, pt, finish, { end });
293303
} else {
294304
throw new ERR_INVALID_RETURN_VALUE(
295305
'AsyncIterable or Promise', 'destination', ret);
@@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
302312
}
303313
} else if (isNodeStream(stream)) {
304314
if (isReadableNodeStream(ret)) {
305-
ret.pipe(stream);
315+
ret.pipe(stream, { end });
306316

307317
// Compat. Before node v10.12.0 stdio used to throw an error so
308318
// pipe() did/does not end() stdio destinations.
@@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
314324
ret = makeAsyncIterable(ret);
315325

316326
finishCount++;
317-
pump(ret, stream, finish);
327+
pump(ret, stream, finish, { end });
318328
}
319329
ret = stream;
320330
} else {
Collapse file

‎lib/stream/promises.js‎

Copy file name to clipboardExpand all lines: lib/stream/promises.js
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
1818
let signal;
19+
let end;
1920
const lastArg = streams[streams.length - 1];
2021
if (lastArg && typeof lastArg === 'object' &&
2122
!isNodeStream(lastArg) && !isIterable(lastArg)) {
2223
const options = ArrayPrototypePop(streams);
2324
signal = options.signal;
25+
end = options.end;
2426
}
2527

2628
pl(streams, (err, value) => {
@@ -29,7 +31,7 @@ function pipeline(...streams) {
2931
} else {
3032
resolve(value);
3133
}
32-
}, { signal });
34+
}, { signal, end });
3335
});
3436
}
3537

Collapse file

‎test/parallel/test-stream-pipeline.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-pipeline.js
+22-1Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1465,5 +1465,26 @@ const tsp = require('timers/promises');
14651465
assert.strictEqual(duplex.destroyed, true);
14661466
}
14671467

1468-
run();
1468+
run().then(common.mustCall());
1469+
}
1470+
1471+
{
1472+
const pipelinePromise = promisify(pipeline);
1473+
1474+
async function run() {
1475+
const read = new Readable({
1476+
read() {}
1477+
});
1478+
1479+
const duplex = new PassThrough();
1480+
1481+
read.push(null);
1482+
1483+
await pipelinePromise(read, duplex, { end: false });
1484+
1485+
assert.strictEqual(duplex.destroyed, false);
1486+
assert.strictEqual(duplex.writableEnded, false);
1487+
}
1488+
1489+
run().then(common.mustCall());
14691490
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.