Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3527646

Browse filesBrowse files
mcollinaaduh95
authored andcommitted
stream: fix nested compose error propagation
PR-URL: #62556 Fixes: #60083 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 3d18162 commit 3527646
Copy full SHA for 3527646

3 files changed

+48-9Lines changed: 48 additions & 9 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎lib/internal/streams/duplexify.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/duplexify.js
+11-1Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,17 @@ function fromAsyncGen(fn) {
239239
_resolve({ __proto__: null, done: true, cb });
240240
},
241241
destroy(err, cb) {
242-
ac.abort();
242+
ac.abort(err);
243+
244+
// If the source async iterator is waiting for the next write/final
245+
// signal, unblock it so the readable side can observe the abort and
246+
// finish destroying.
247+
if (resolve !== null) {
248+
const _resolve = resolve;
249+
resolve = null;
250+
_resolve({ __proto__: null, done: true, cb() {} });
251+
}
252+
243253
cb(err);
244254
},
245255
};
Collapse file

‎lib/internal/streams/from.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/from.js
+19-8Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ const {
88
const { Buffer } = require('buffer');
99

1010
const {
11-
ERR_INVALID_ARG_TYPE,
12-
ERR_STREAM_NULL_VALUES,
13-
} = require('internal/errors').codes;
11+
aggregateTwoErrors,
12+
codes: {
13+
ERR_INVALID_ARG_TYPE,
14+
ERR_STREAM_NULL_VALUES,
15+
},
16+
} = require('internal/errors');
1417

1518
function from(Readable, iterable, opts) {
1619
let iterator;
@@ -43,6 +46,7 @@ function from(Readable, iterable, opts) {
4346
// TODO(ronag): What options should be allowed?
4447
...opts,
4548
});
49+
const originalDestroy = readable._destroy;
4650

4751
// Flag to protect against _read
4852
// being called before last iteration completion.
@@ -64,11 +68,18 @@ function from(Readable, iterable, opts) {
6468
};
6569

6670
readable._destroy = function(error, cb) {
67-
PromisePrototypeThen(
68-
close(error),
69-
() => process.nextTick(cb, error), // nextTick is here in case cb throws
70-
(e) => process.nextTick(cb, e || error),
71-
);
71+
originalDestroy.call(this, error, (destroyError) => {
72+
const combinedError = destroyError || error;
73+
PromisePrototypeThen(
74+
close(combinedError),
75+
// nextTick is here in case cb throws
76+
() => process.nextTick(cb, combinedError),
77+
(closeError) => process.nextTick(
78+
cb,
79+
aggregateTwoErrors(combinedError, closeError),
80+
),
81+
);
82+
});
7283
};
7384

7485
async function close(error) {
Collapse file

‎test/parallel/test-stream-readable-compose.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-compose.js
+18Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ const assert = require('assert');
116116
).then(common.mustCall());
117117
}
118118

119+
{
120+
// Errors from nested `.compose()` calls should propagate instead of hanging.
121+
const stream = Readable.from(['hello'])
122+
.compose(async function *(source) { // eslint-disable-line require-yield
123+
for await (const chunk of source) {
124+
throw new Error(`boom: ${chunk}`);
125+
}
126+
})
127+
.compose(async function *(source) {
128+
yield* source;
129+
});
130+
131+
assert.rejects(
132+
stream.toArray(),
133+
/boom: hello/,
134+
).then(common.mustCall());
135+
}
136+
119137
{
120138
// AbortSignal
121139
const ac = new AbortController();

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.