Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit aaddf97

Browse filesBrowse files
mcollinatargos
authored andcommitted
stream: async iteration should work with destroyed stream
Fixes #23730. PR-URL: #23785 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matheus Marchini <mat@mmarchini.me>
1 parent 35c3c4b commit aaddf97
Copy full SHA for aaddf97

File tree

Expand file treeCollapse file tree

2 files changed

+82
-27
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+82
-27
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/async_iterator.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/async_iterator.js
+43-26Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
'use strict';
22

3+
const finished = require('internal/streams/end-of-stream');
4+
35
const kLastResolve = Symbol('lastResolve');
46
const kLastReject = Symbol('lastReject');
57
const kError = Symbol('error');
@@ -34,30 +36,6 @@ function onReadable(iter) {
3436
process.nextTick(readAndResolve, iter);
3537
}
3638

37-
function onEnd(iter) {
38-
const resolve = iter[kLastResolve];
39-
if (resolve !== null) {
40-
iter[kLastPromise] = null;
41-
iter[kLastResolve] = null;
42-
iter[kLastReject] = null;
43-
resolve(createIterResult(null, true));
44-
}
45-
iter[kEnded] = true;
46-
}
47-
48-
function onError(iter, err) {
49-
const reject = iter[kLastReject];
50-
// reject if we are waiting for data in the Promise
51-
// returned by next() and store the error
52-
if (reject !== null) {
53-
iter[kLastPromise] = null;
54-
iter[kLastResolve] = null;
55-
iter[kLastReject] = null;
56-
reject(err);
57-
}
58-
iter[kError] = err;
59-
}
60-
6139
function wrapForNext(lastPromise, iter) {
6240
return function(resolve, reject) {
6341
lastPromise.then(function() {
@@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
8664
return Promise.resolve(createIterResult(null, true));
8765
}
8866

67+
if (this[kStream].destroyed) {
68+
// We need to defer via nextTick because if .destroy(err) is
69+
// called, the error will be emitted via nextTick, and
70+
// we cannot guarantee that there is no error lingering around
71+
// waiting to be emitted.
72+
return new Promise((resolve, reject) => {
73+
process.nextTick(() => {
74+
if (this[kError]) {
75+
reject(this[kError]);
76+
} else {
77+
resolve(createIterResult(null, true));
78+
}
79+
});
80+
});
81+
}
82+
8983
// if we have multiple next() calls
9084
// we will wait for the previous Promise to finish
9185
// this logic is optimized to support for await loops,
@@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => {
155149
},
156150
});
157151

152+
finished(stream, (err) => {
153+
if (err) {
154+
const reject = iterator[kLastReject];
155+
// reject if we are waiting for data in the Promise
156+
// returned by next() and store the error
157+
if (reject !== null) {
158+
iterator[kLastPromise] = null;
159+
iterator[kLastResolve] = null;
160+
iterator[kLastReject] = null;
161+
reject(err);
162+
}
163+
iterator[kError] = err;
164+
return;
165+
}
166+
167+
const resolve = iterator[kLastResolve];
168+
if (resolve !== null) {
169+
iterator[kLastPromise] = null;
170+
iterator[kLastResolve] = null;
171+
iterator[kLastReject] = null;
172+
resolve(createIterResult(null, true));
173+
}
174+
iterator[kEnded] = true;
175+
});
176+
158177
stream.on('readable', onReadable.bind(null, iterator));
159-
stream.on('end', onEnd.bind(null, iterator));
160-
stream.on('error', onError.bind(null, iterator));
161178

162179
return iterator;
163180
};
Collapse file

‎test/parallel/test-stream-readable-async-iterators.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-async-iterators.js
+39-1Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Readable } = require('stream');
4+
const { Readable, PassThrough, pipeline } = require('stream');
55
const assert = require('assert');
66

77
async function tests() {
@@ -324,6 +324,44 @@ async function tests() {
324324

325325
assert.strictEqual(data, expected);
326326
})();
327+
328+
await (async function() {
329+
console.log('.next() on destroyed stream');
330+
const readable = new Readable({
331+
read() {
332+
// no-op
333+
}
334+
});
335+
336+
readable.destroy();
337+
338+
try {
339+
await readable[Symbol.asyncIterator]().next();
340+
} catch (e) {
341+
assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE');
342+
}
343+
})();
344+
345+
await (async function() {
346+
console.log('.next() on pipelined stream');
347+
const readable = new Readable({
348+
read() {
349+
// no-op
350+
}
351+
});
352+
353+
const passthrough = new PassThrough();
354+
const err = new Error('kaboom');
355+
pipeline(readable, passthrough, common.mustCall((e) => {
356+
assert.strictEqual(e, err);
357+
}));
358+
readable.destroy(err);
359+
try {
360+
await readable[Symbol.asyncIterator]().next();
361+
} catch (e) {
362+
assert.strictEqual(e, err);
363+
}
364+
})();
327365
}
328366

329367
// to avoid missing some tests if a promise does not resolve

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.