Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit da11b95

Browse filesBrowse files
benjamingrronag
authored andcommitted
stream: add forEach method
Add a `forEach` method to readable streams to enable concurrent iteration and align with the iterator-helpers proposal. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #41445 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 8429216 commit da11b95
Copy full SHA for da11b95

File tree

Expand file treeCollapse file tree

4 files changed

+182
-10
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+182
-10
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+61-2Lines changed: 61 additions & 2 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1697,7 +1697,7 @@ added: v17.4.0
16971697
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
16981698
abort the `fn` call early.
16991699
* `options` {Object}
1700-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1700+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17011701
on the stream at once. **Default:** `1`.
17021702
* `signal` {AbortSignal} allows destroying the stream if the signal is
17031703
aborted.
@@ -1741,7 +1741,7 @@ added: v17.4.0
17411741
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
17421742
abort the `fn` call early.
17431743
* `options` {Object}
1744-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1744+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17451745
on the stream at once. **Default:** `1`.
17461746
* `signal` {AbortSignal} allows destroying the stream if the signal is
17471747
aborted.
@@ -1776,6 +1776,65 @@ for await (const result of dnsResults) {
17761776
}
17771777
```
17781778

1779+
### `readable.forEach(fn[, options])`
1780+
1781+
<!-- YAML
1782+
added: REPLACEME
1783+
-->
1784+
1785+
> Stability: 1 - Experimental
1786+
1787+
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1788+
* `data` {any} a chunk of data from the stream.
1789+
* `options` {Object}
1790+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1791+
abort the `fn` call early.
1792+
* `options` {Object}
1793+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1794+
on the stream at once. **Default:** `1`.
1795+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1796+
aborted.
1797+
* Returns: {Promise} a promise for when the stream has finished.
1798+
1799+
This method allows iterating a stream. For each item in the stream the
1800+
`fn` function will be called. If the `fn` function returns a promise - that
1801+
promise will be `await`ed.
1802+
1803+
This method is different from `for await...of` loops in that it can optionally
1804+
process items concurrently. In addition, a `forEach` iteration can only be
1805+
stopped by having passed a `signal` option and aborting the related
1806+
`AbortController` while `for await...of` can be stopped with `break` or
1807+
`return`. In either case the stream will be destroyed.
1808+
1809+
This method is different from listening to the [`'data'`][] event in that it
1810+
uses the [`readable`][] event in the underlying machinary and can limit the
1811+
number of concurrent `fn` calls.
1812+
1813+
```mjs
1814+
import { Readable } from 'stream';
1815+
import { Resolver } from 'dns/promises';
1816+
1817+
// With a synchronous predicate.
1818+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1819+
console.log(item); // 3, 4
1820+
}
1821+
// With an asynchronous predicate, making at most 2 queries at a time.
1822+
const resolver = new Resolver();
1823+
const dnsResults = await Readable.from([
1824+
'nodejs.org',
1825+
'openjsf.org',
1826+
'www.linuxfoundation.org',
1827+
]).map(async (domain) => {
1828+
const { address } = await resolver.resolve4(domain, { ttl: true });
1829+
return address;
1830+
}, { concurrency: 2 });
1831+
await dnsResults.forEach((result) => {
1832+
// Logs result, similar to `for await (const result of dnsResults)`
1833+
console.log(result);
1834+
});
1835+
console.log('done'); // Stream has finished
1836+
```
1837+
17791838
### Duplex and transform streams
17801839

17811840
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+23-5Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
2323
async function * map(fn, options) {
2424
if (typeof fn !== 'function') {
2525
throw new ERR_INVALID_ARG_TYPE(
26-
'fn', ['Function', 'AsyncFunction'], this);
26+
'fn', ['Function', 'AsyncFunction'], fn);
2727
}
2828

2929
if (options != null && typeof options !== 'object') {
@@ -147,10 +147,23 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function forEach(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], fn);
154+
}
155+
async function forEachFn(value, options) {
156+
await fn(value, options);
157+
return kEmpty;
158+
}
159+
// eslint-disable-next-line no-unused-vars
160+
for await (const unused of this.map(forEachFn, options));
161+
}
162+
150163
async function * filter(fn, options) {
151164
if (typeof fn !== 'function') {
152-
throw (new ERR_INVALID_ARG_TYPE(
153-
'fn', ['Function', 'AsyncFunction'], this));
165+
throw new ERR_INVALID_ARG_TYPE(
166+
'fn', ['Function', 'AsyncFunction'], fn);
154167
}
155168
async function filterFn(value, options) {
156169
if (await fn(value, options)) {
@@ -160,7 +173,12 @@ async function * filter(fn, options) {
160173
}
161174
yield* this.map(filterFn, options);
162175
}
163-
module.exports = {
176+
177+
module.exports.streamReturningOperators = {
178+
filter,
164179
map,
165-
filter
180+
};
181+
182+
module.exports.promiseReturningOperators = {
183+
forEach,
166184
};
Collapse file

‎lib/stream.js‎

Copy file name to clipboardExpand all lines: lib/stream.js
+12-3Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ const {
3131
promisify: { custom: customPromisify },
3232
} = require('internal/util');
3333

34-
const operators = require('internal/streams/operators');
34+
const {
35+
streamReturningOperators,
36+
promiseReturningOperators,
37+
} = require('internal/streams/operators');
3538
const compose = require('internal/streams/compose');
3639
const { pipeline } = require('internal/streams/pipeline');
3740
const { destroyer } = require('internal/streams/destroy');
@@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
4649
Stream.isErrored = utils.isErrored;
4750
Stream.isReadable = utils.isReadable;
4851
Stream.Readable = require('internal/streams/readable');
49-
for (const key of ObjectKeys(operators)) {
50-
const op = operators[key];
52+
for (const key of ObjectKeys(streamReturningOperators)) {
53+
const op = streamReturningOperators[key];
5154
Stream.Readable.prototype[key] = function(...args) {
5255
return Stream.Readable.from(ReflectApply(op, this, args));
5356
};
5457
}
58+
for (const key of ObjectKeys(promiseReturningOperators)) {
59+
const op = promiseReturningOperators[key];
60+
Stream.Readable.prototype[key] = function(...args) {
61+
return ReflectApply(op, this, args);
62+
};
63+
}
5564
Stream.Writable = require('internal/streams/writable');
5665
Stream.Duplex = require('internal/streams/duplex');
5766
Stream.Transform = require('internal/streams/transform');
Collapse file
+86Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// forEach works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3]);
13+
const result = [1, 2, 3];
14+
(async () => {
15+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
16+
})().then(common.mustCall());
17+
}
18+
19+
{
20+
// forEach works an asynchronous streams
21+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
22+
await Promise.resolve();
23+
return true;
24+
});
25+
const result = [1, 2, 3];
26+
(async () => {
27+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
28+
})().then(common.mustCall());
29+
}
30+
31+
{
32+
// forEach works on asynchronous streams with a asynchronous forEach fn
33+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
34+
await Promise.resolve();
35+
return true;
36+
});
37+
const result = [1, 2, 3];
38+
(async () => {
39+
await stream.forEach(async (value) => {
40+
await Promise.resolve();
41+
assert.strictEqual(value, result.shift());
42+
});
43+
})().then(common.mustCall());
44+
}
45+
46+
{
47+
// Concurrency + AbortSignal
48+
const ac = new AbortController();
49+
let calls = 0;
50+
const forEachPromise =
51+
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
52+
calls++;
53+
await setTimeout(100, { signal });
54+
}, { signal: ac.signal, concurrency: 2 });
55+
// pump
56+
assert.rejects(async () => {
57+
await forEachPromise;
58+
}, {
59+
name: 'AbortError',
60+
}).then(common.mustCall());
61+
62+
setImmediate(() => {
63+
ac.abort();
64+
assert.strictEqual(calls, 2);
65+
});
66+
}
67+
68+
{
69+
// Error cases
70+
assert.rejects(async () => {
71+
await Readable.from([1]).forEach(1);
72+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
73+
assert.rejects(async () => {
74+
await Readable.from([1]).forEach((x) => x, {
75+
concurrency: 'Foo'
76+
});
77+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
78+
assert.rejects(async () => {
79+
await Readable.from([1]).forEach((x) => x, 1);
80+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
81+
}
82+
{
83+
// Test result is a Promise
84+
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
85+
assert.strictEqual(typeof stream.then, 'function');
86+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.