Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9054d25

Browse filesBrowse files
Linkgorondanielleadams
authored andcommitted
stream: add a non-destroying iterator to Readable
add a non-destroying iterator to Readable fixes: #38491 PR-URL: #38526 Fixes: #38491 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent e2f28c8 commit 9054d25
Copy full SHA for 9054d25

File tree

Expand file treeCollapse file tree

3 files changed

+204
-10
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+204
-10
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+58-2Lines changed: 58 additions & 2 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1506,13 +1506,69 @@ async function print(readable) {
15061506
print(fs.createReadStream('file')).catch(console.error);
15071507
```
15081508

1509-
If the loop terminates with a `break` or a `throw`, the stream will be
1510-
destroyed. In other terms, iterating over a stream will consume the stream
1509+
If the loop terminates with a `break`, `return`, or a `throw`, the stream will
1510+
be destroyed. In other terms, iterating over a stream will consume the stream
15111511
fully. The stream will be read in chunks of size equal to the `highWaterMark`
15121512
option. In the code example above, data will be in a single chunk if the file
15131513
has less then 64KB of data because no `highWaterMark` option is provided to
15141514
[`fs.createReadStream()`][].
15151515

1516+
##### `readable.iterator([options])`
1517+
<!-- YAML
1518+
added: REPLACEME
1519+
-->
1520+
1521+
> Stability: 1 - Experimental
1522+
1523+
* `options` {Object}
1524+
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
1525+
async iterator, or exiting a `for await...of` iteration using a `break`,
1526+
`return`, or `throw` will not destroy the stream. **Default:** `true`.
1527+
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
1528+
error while it's being iterated, the iterator will not destroy the stream.
1529+
**Default:** `true`.
1530+
* Returns: {AsyncIterator} to consume the stream.
1531+
1532+
The iterator created by this method gives users the option to cancel the
1533+
destruction of the stream if the `for await...of` loop is exited by `return`,
1534+
`break`, or `throw`, or if the iterator should destroy the stream if the stream
1535+
emitted an error during iteration.
1536+
1537+
```js
1538+
const { Readable } = require('stream');
1539+
1540+
async function printIterator(readable) {
1541+
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
1542+
console.log(chunk); // 1
1543+
break;
1544+
}
1545+
1546+
console.log(readable.destroyed); // false
1547+
1548+
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
1549+
console.log(chunk); // Will print 2 and then 3
1550+
}
1551+
1552+
console.log(readable.destroyed); // True, stream was totally consumed
1553+
}
1554+
1555+
async function printSymbolAsyncIterator(readable) {
1556+
for await (const chunk of readable) {
1557+
console.log(chunk); // 1
1558+
break;
1559+
}
1560+
1561+
console.log(readable.destroyed); // true
1562+
}
1563+
1564+
async function showBoth() {
1565+
await printIterator(Readable.from([1, 2, 3]));
1566+
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
1567+
}
1568+
1569+
showBoth();
1570+
```
1571+
15161572
### Duplex and transform streams
15171573

15181574
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/readable.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/readable.js
+30-8Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const {
6262
ERR_METHOD_NOT_IMPLEMENTED,
6363
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
6464
} = require('internal/errors').codes;
65+
const { validateObject } = require('internal/validators');
6566

6667
const kPaused = Symbol('kPaused');
6768

@@ -1062,8 +1063,17 @@ Readable.prototype.wrap = function(stream) {
10621063
};
10631064

10641065
Readable.prototype[SymbolAsyncIterator] = function() {
1065-
let stream = this;
1066+
return streamToAsyncIterator(this);
1067+
};
10661068

1069+
Readable.prototype.iterator = function(options) {
1070+
if (options !== undefined) {
1071+
validateObject(options, 'options');
1072+
}
1073+
return streamToAsyncIterator(this, options);
1074+
};
1075+
1076+
function streamToAsyncIterator(stream, options) {
10671077
if (typeof stream.read !== 'function') {
10681078
// v1 stream
10691079
const src = stream;
@@ -1076,14 +1086,20 @@ Readable.prototype[SymbolAsyncIterator] = function() {
10761086
}).wrap(src);
10771087
}
10781088

1079-
const iter = createAsyncIterator(stream);
1089+
const iter = createAsyncIterator(stream, options);
10801090
iter.stream = stream;
10811091
return iter;
1082-
};
1092+
}
10831093

1084-
async function* createAsyncIterator(stream) {
1094+
async function* createAsyncIterator(stream, options) {
10851095
let callback = nop;
10861096

1097+
const opts = {
1098+
destroyOnReturn: true,
1099+
destroyOnError: true,
1100+
...options,
1101+
};
1102+
10871103
function next(resolve) {
10881104
if (this === stream) {
10891105
callback();
@@ -1116,6 +1132,7 @@ async function* createAsyncIterator(stream) {
11161132
next.call(this);
11171133
});
11181134

1135+
let errorThrown = false;
11191136
try {
11201137
while (true) {
11211138
const chunk = stream.destroyed ? null : stream.read();
@@ -1132,12 +1149,17 @@ async function* createAsyncIterator(stream) {
11321149
}
11331150
}
11341151
} catch (err) {
1135-
destroyImpl.destroyer(stream, err);
1152+
if (opts.destroyOnError) {
1153+
destroyImpl.destroyer(stream, err);
1154+
}
1155+
errorThrown = true;
11361156
throw err;
11371157
} finally {
1138-
if (state.autoDestroy || !endEmitted) {
1139-
// TODO(ronag): ERR_PREMATURE_CLOSE?
1140-
destroyImpl.destroyer(stream, null);
1158+
if (!errorThrown && opts.destroyOnReturn) {
1159+
if (state.autoDestroy || !endEmitted) {
1160+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1161+
destroyImpl.destroyer(stream, null);
1162+
}
11411163
}
11421164
}
11431165
}
Collapse file

‎test/parallel/test-stream-readable-async-iterators.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-async-iterators.js
+116Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,122 @@ async function tests() {
693693
});
694694
}
695695

696+
// AsyncIterator non-destroying iterator
697+
{
698+
function createReadable() {
699+
return Readable.from((async function* () {
700+
await Promise.resolve();
701+
yield 5;
702+
await Promise.resolve();
703+
yield 7;
704+
await Promise.resolve();
705+
})());
706+
}
707+
708+
function createErrorReadable() {
709+
const opts = { read() { throw new Error('inner'); } };
710+
return new Readable(opts);
711+
}
712+
713+
// Check default destroys on return
714+
(async function() {
715+
const readable = createReadable();
716+
for await (const chunk of readable.iterator()) {
717+
assert.strictEqual(chunk, 5);
718+
break;
719+
}
720+
721+
assert.ok(readable.destroyed);
722+
})().then(common.mustCall());
723+
724+
// Check explicit destroying on return
725+
(async function() {
726+
const readable = createReadable();
727+
for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
728+
assert.strictEqual(chunk, 5);
729+
break;
730+
}
731+
732+
assert.ok(readable.destroyed);
733+
})().then(common.mustCall());
734+
735+
// Check default destroys on error
736+
(async function() {
737+
const readable = createErrorReadable();
738+
try {
739+
// eslint-disable-next-line no-unused-vars
740+
for await (const chunk of readable) { }
741+
assert.fail('should have thrown');
742+
} catch (err) {
743+
assert.strictEqual(err.message, 'inner');
744+
}
745+
746+
assert.ok(readable.destroyed);
747+
})().then(common.mustCall());
748+
749+
// Check explicit destroys on error
750+
(async function() {
751+
const readable = createErrorReadable();
752+
const opts = { destroyOnError: true, destroyOnReturn: false };
753+
try {
754+
// eslint-disable-next-line no-unused-vars
755+
for await (const chunk of readable.iterator(opts)) { }
756+
assert.fail('should have thrown');
757+
} catch (err) {
758+
assert.strictEqual(err.message, 'inner');
759+
}
760+
761+
assert.ok(readable.destroyed);
762+
})().then(common.mustCall());
763+
764+
// Check explicit non-destroy with return true
765+
(async function() {
766+
const readable = createErrorReadable();
767+
const opts = { destroyOnError: false, destroyOnReturn: true };
768+
try {
769+
// eslint-disable-next-line no-unused-vars
770+
for await (const chunk of readable.iterator(opts)) { }
771+
assert.fail('should have thrown');
772+
} catch (err) {
773+
assert.strictEqual(err.message, 'inner');
774+
}
775+
776+
assert.ok(!readable.destroyed);
777+
})().then(common.mustCall());
778+
779+
// Check explicit non-destroy with return true
780+
(async function() {
781+
const readable = createReadable();
782+
const opts = { destroyOnReturn: false };
783+
for await (const chunk of readable.iterator(opts)) {
784+
assert.strictEqual(chunk, 5);
785+
break;
786+
}
787+
788+
assert.ok(!readable.destroyed);
789+
790+
for await (const chunk of readable.iterator(opts)) {
791+
assert.strictEqual(chunk, 7);
792+
}
793+
794+
assert.ok(readable.destroyed);
795+
})().then(common.mustCall());
796+
797+
// Check non-object options.
798+
{
799+
const readable = createReadable();
800+
assert.throws(
801+
() => readable.iterator(42),
802+
{
803+
code: 'ERR_INVALID_ARG_TYPE',
804+
name: 'TypeError',
805+
message: 'The "options" argument must be of type object. Received ' +
806+
'type number (42)',
807+
}
808+
);
809+
}
810+
}
811+
696812
{
697813
let _req;
698814
const server = http.createServer((request, response) => {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.