Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 313ecaa

Browse filesBrowse files
ronagcodebytere
authored andcommitted
stream: fix broken pipeline error propagation
If the destination was an async function any error thrown from that function would be swallowed. Backport-PR-URL: #31975 PR-URL: #31835 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
1 parent 8ad64b8 commit 313ecaa
Copy full SHA for 313ecaa

File tree

Expand file treeCollapse file tree

3 files changed

+34
-13
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+34
-13
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+8-8Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,10 @@ function pipeline(...streams) {
163163
}
164164

165165
let error;
166+
let value;
166167
const destroys = [];
167168

168-
function finish(err, val, final) {
169+
function finish(err, final) {
169170
if (!error && err) {
170171
error = err;
171172
}
@@ -177,13 +178,13 @@ function pipeline(...streams) {
177178
}
178179

179180
if (final) {
180-
callback(error, val);
181+
callback(error, value);
181182
}
182183
}
183184

184185
function wrap(stream, reading, writing, final) {
185186
destroys.push(destroyer(stream, reading, writing, (err) => {
186-
finish(err, null, final);
187+
finish(err, final);
187188
}));
188189
}
189190

@@ -229,11 +230,10 @@ function pipeline(...streams) {
229230
if (isPromise(ret)) {
230231
ret
231232
.then((val) => {
233+
value = val;
232234
pt.end(val);
233-
finish(null, val, true);
234-
})
235-
.catch((err) => {
236-
finish(err, null, true);
235+
}, (err) => {
236+
pt.destroy(err);
237237
});
238238
} else if (isIterable(ret, true)) {
239239
pump(ret, pt, finish);
@@ -243,7 +243,7 @@ function pipeline(...streams) {
243243
}
244244

245245
ret = pt;
246-
wrap(ret, true, false, true);
246+
wrap(ret, false, true, true);
247247
}
248248
} else if (isStream(stream)) {
249249
if (isReadable(ret)) {
Collapse file
+25Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
pipeline,
6+
PassThrough
7+
} = require('stream');
8+
const assert = require('assert');
9+
10+
process.on('uncaughtException', common.mustCall((err) => {
11+
assert.strictEqual(err.message, 'error');
12+
}));
13+
14+
// Ensure that pipeline that ends with Promise
15+
// still propagates error to uncaughtException.
16+
const s = new PassThrough();
17+
s.end('data');
18+
pipeline(s, async function(source) {
19+
for await (const chunk of source) {
20+
chunk;
21+
}
22+
}, common.mustCall((err) => {
23+
assert.ifError(err);
24+
throw new Error('error');
25+
}));
Collapse file

‎test/parallel/test-stream-pipeline.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-pipeline.js
+1-5Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -613,11 +613,9 @@ const { promisify } = require('util');
613613
yield 'hello';
614614
yield 'world';
615615
}, async function*(source) {
616-
const ret = [];
617616
for await (const chunk of source) {
618-
ret.push(chunk.toUpperCase());
617+
yield chunk.toUpperCase();
619618
}
620-
yield ret;
621619
}, async function(source) {
622620
let ret = '';
623621
for await (const chunk of source) {
@@ -754,7 +752,6 @@ const { promisify } = require('util');
754752
}, common.mustCall((err) => {
755753
assert.strictEqual(err, undefined);
756754
assert.strictEqual(ret, 'asd');
757-
assert.strictEqual(s.destroyed, true);
758755
}));
759756
}
760757

@@ -775,7 +772,6 @@ const { promisify } = require('util');
775772
}, common.mustCall((err) => {
776773
assert.strictEqual(err, undefined);
777774
assert.strictEqual(ret, 'asd');
778-
assert.strictEqual(s.destroyed, true);
779775
}));
780776
}
781777

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.