Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1ae6485

Browse filesBrowse files
Linkgoronruyadorno
authored andcommitted
stream: add iterator helper find
Continue iterator-helpers work by adding `find` to readable streams. PR-URL: #41849 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent d641eb7 commit 1ae6485
Copy full SHA for 1ae6485

File tree

Expand file treeCollapse file tree

7 files changed

+282
-153
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+282
-153
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+70-19Lines changed: 70 additions & 19 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1691,7 +1691,8 @@ added: v17.4.0
16911691

16921692
> Stability: 1 - Experimental
16931693
1694-
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
1694+
* `fn` {Function|AsyncFunction} a function to map over every chunk in the
1695+
stream.
16951696
* `data` {any} a chunk of data from the stream.
16961697
* `options` {Object}
16971698
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1704,16 +1705,16 @@ added: v17.4.0
17041705
* Returns: {Readable} a stream mapped with the function `fn`.
17051706

17061707
This method allows mapping over the stream. The `fn` function will be called
1707-
for every item in the stream. If the `fn` function returns a promise - that
1708+
for every chunk in the stream. If the `fn` function returns a promise - that
17081709
promise will be `await`ed before being passed to the result stream.
17091710

17101711
```mjs
17111712
import { Readable } from 'stream';
17121713
import { Resolver } from 'dns/promises';
17131714

17141715
// With a synchronous mapper.
1715-
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1716-
console.log(item); // 2, 4, 6, 8
1716+
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
1717+
console.log(chunk); // 2, 4, 6, 8
17171718
}
17181719
// With an asynchronous mapper, making at most 2 queries at a time.
17191720
const resolver = new Resolver();
@@ -1735,7 +1736,7 @@ added: v17.4.0
17351736

17361737
> Stability: 1 - Experimental
17371738
1738-
* `fn` {Function|AsyncFunction} a function to filter items from stream.
1739+
* `fn` {Function|AsyncFunction} a function to filter chunks from the stream.
17391740
* `data` {any} a chunk of data from the stream.
17401741
* `options` {Object}
17411742
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1747,8 +1748,8 @@ added: v17.4.0
17471748
aborted.
17481749
* Returns: {Readable} a stream filtered with the predicate `fn`.
17491750

1750-
This method allows filtering the stream. For each item in the stream the `fn`
1751-
function will be called and if it returns a truthy value, the item will be
1751+
This method allows filtering the stream. For each chunk in the stream the `fn`
1752+
function will be called and if it returns a truthy value, the chunk will be
17521753
passed to the result stream. If the `fn` function returns a promise - that
17531754
promise will be `await`ed.
17541755

@@ -1757,8 +1758,8 @@ import { Readable } from 'stream';
17571758
import { Resolver } from 'dns/promises';
17581759

17591760
// With a synchronous predicate.
1760-
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1761-
console.log(item); // 3, 4
1761+
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1762+
console.log(chunk); // 3, 4
17621763
}
17631764
// With an asynchronous predicate, making at most 2 queries at a time.
17641765
const resolver = new Resolver();
@@ -1784,7 +1785,7 @@ added: REPLACEME
17841785

17851786
> Stability: 1 - Experimental
17861787
1787-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1788+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
17881789
* `data` {any} a chunk of data from the stream.
17891790
* `options` {Object}
17901791
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1796,12 +1797,12 @@ added: REPLACEME
17961797
aborted.
17971798
* Returns: {Promise} a promise for when the stream has finished.
17981799

1799-
This method allows iterating a stream. For each item in the stream the
1800+
This method allows iterating a stream. For each chunk in the stream the
18001801
`fn` function will be called. If the `fn` function returns a promise - that
18011802
promise will be `await`ed.
18021803

18031804
This method is different from `for await...of` loops in that it can optionally
1804-
process items concurrently. In addition, a `forEach` iteration can only be
1805+
process chunks concurrently. In addition, a `forEach` iteration can only be
18051806
stopped by having passed a `signal` option and aborting the related
18061807
`AbortController` while `for await...of` can be stopped with `break` or
18071808
`return`. In either case the stream will be destroyed.
@@ -1815,8 +1816,8 @@ import { Readable } from 'stream';
18151816
import { Resolver } from 'dns/promises';
18161817

18171818
// With a synchronous predicate.
1818-
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1819-
console.log(item); // 3, 4
1819+
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1820+
console.log(chunk); // 3, 4
18201821
}
18211822
// With an asynchronous predicate, making at most 2 queries at a time.
18221823
const resolver = new Resolver();
@@ -1881,7 +1882,7 @@ added: REPLACEME
18811882

18821883
> Stability: 1 - Experimental
18831884
1884-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1885+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
18851886
* `data` {any} a chunk of data from the stream.
18861887
* `options` {Object}
18871888
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1922,6 +1923,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
19221923
console.log('done'); // Stream has finished
19231924
```
19241925

1926+
### `readable.find(fn[, options])`
1927+
1928+
<!-- YAML
1929+
added: REPLACEME
1930+
-->
1931+
1932+
> Stability: 1 - Experimental
1933+
1934+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
1935+
* `data` {any} a chunk of data from the stream.
1936+
* `options` {Object}
1937+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1938+
abort the `fn` call early.
1939+
* `options` {Object}
1940+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1941+
on the stream at once. **Default:** `1`.
1942+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1943+
aborted.
1944+
* Returns: {Promise} a promise evaluating to the first chunk for which `fn`
1945+
evaluated with a truthy value, or `undefined` if no element was found.
1946+
1947+
This method is similar to `Array.prototype.find` and calls `fn` on each chunk
1948+
in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's
1949+
awaited return value is truthy, the stream is destroyed and the promise is
1950+
fulfilled with value for which `fn` returned a truthy value. If all of the
1951+
`fn` calls on the chunks return a falsy value, the promise is fulfilled with
1952+
`undefined`.
1953+
1954+
```mjs
1955+
import { Readable } from 'stream';
1956+
import { stat } from 'fs/promises';
1957+
1958+
// With a synchronous predicate.
1959+
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
1960+
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
1961+
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
1962+
1963+
// With an asynchronous predicate, making at most 2 file checks at a time.
1964+
const foundBigFile = await Readable.from([
1965+
'file1',
1966+
'file2',
1967+
'file3',
1968+
]).find(async (fileName) => {
1969+
const stats = await stat(fileName);
1970+
return stat.size > 1024 * 1024;
1971+
}, { concurrency: 2 });
1972+
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
1973+
console.log('done'); // Stream has finished
1974+
```
1975+
19251976
### `readable.every(fn[, options])`
19261977

19271978
<!-- YAML
@@ -1930,7 +1981,7 @@ added: REPLACEME
19301981

19311982
> Stability: 1 - Experimental
19321983
1933-
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1984+
* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
19341985
* `data` {any} a chunk of data from the stream.
19351986
* `options` {Object}
19361987
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1980,7 +2031,7 @@ added: REPLACEME
19802031
> Stability: 1 - Experimental
19812032
19822033
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1983-
every item in the stream.
2034+
every chunk in the stream.
19842035
* `data` {any} a chunk of data from the stream.
19852036
* `options` {Object}
19862037
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -2004,8 +2055,8 @@ import { Readable } from 'stream';
20042055
import { createReadStream } from 'fs';
20052056

20062057
// With a synchronous mapper.
2007-
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2008-
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2058+
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2059+
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
20092060
}
20102061
// With an asynchronous mapper, combine the contents of 4 files
20112062
const concatResult = Readable.from([
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+14-28Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -186,31 +186,9 @@ function asIndexedPairs(options = undefined) {
186186
}
187187

188188
async function some(fn, options) {
189-
if (options != null && typeof options !== 'object') {
190-
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
191-
}
192-
if (options?.signal != null) {
193-
validateAbortSignal(options.signal, 'options.signal');
194-
}
195-
196-
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
197-
// Note that some does short circuit but also closes the iterator if it does
198-
const ac = new AbortController();
199-
if (options?.signal) {
200-
if (options.signal.aborted) {
201-
ac.abort();
202-
}
203-
options.signal.addEventListener('abort', () => ac.abort(), {
204-
[kWeakHandler]: this,
205-
once: true,
206-
});
207-
}
208-
const mapped = this.map(fn, { ...options, signal: ac.signal });
209-
for await (const result of mapped) {
210-
if (result) {
211-
ac.abort();
212-
return true;
213-
}
189+
// eslint-disable-next-line no-unused-vars
190+
for await (const unused of filter.call(this, fn, options)) {
191+
return true;
214192
}
215193
return false;
216194
}
@@ -226,6 +204,13 @@ async function every(fn, options) {
226204
}, options));
227205
}
228206

207+
async function find(fn, options) {
208+
for await (const result of filter.call(this, fn, options)) {
209+
return result;
210+
}
211+
return undefined;
212+
}
213+
229214
async function forEach(fn, options) {
230215
if (typeof fn !== 'function') {
231216
throw new ERR_INVALID_ARG_TYPE(
@@ -236,7 +221,7 @@ async function forEach(fn, options) {
236221
return kEmpty;
237222
}
238223
// eslint-disable-next-line no-unused-vars
239-
for await (const unused of this.map(forEachFn, options));
224+
for await (const unused of map.call(this, forEachFn, options));
240225
}
241226

242227
function filter(fn, options) {
@@ -250,7 +235,7 @@ function filter(fn, options) {
250235
}
251236
return kEmpty;
252237
}
253-
return this.map(filterFn, options);
238+
return map.call(this, filterFn, options);
254239
}
255240

256241
// Specific to provide better error to reduce since the argument is only
@@ -329,7 +314,7 @@ async function toArray(options) {
329314
}
330315

331316
function flatMap(fn, options) {
332-
const values = this.map(fn, options);
317+
const values = map.call(this, fn, options);
333318
return async function* flatMap() {
334319
for await (const val of values) {
335320
yield* val;
@@ -415,4 +400,5 @@ module.exports.promiseReturningOperators = {
415400
reduce,
416401
toArray,
417402
some,
403+
find,
418404
};
Collapse file

‎test/parallel/test-stream-filter.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-filter.js
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,11 @@ const { setTimeout } = require('timers/promises');
9898
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
9999
assert.strictEqual(stream.readable, true);
100100
}
101+
{
102+
const stream = Readable.from([1, 2, 3, 4, 5]);
103+
Object.defineProperty(stream, 'map', {
104+
value: common.mustNotCall(() => {}),
105+
});
106+
// Check that map isn't getting called.
107+
stream.filter(() => true);
108+
}
Collapse file

‎test/parallel/test-stream-flatMap.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-flatMap.js
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,11 @@ function oneTo5() {
121121
const stream = oneTo5().flatMap((x) => x);
122122
assert.strictEqual(stream.readable, true);
123123
}
124+
{
125+
const stream = oneTo5();
126+
Object.defineProperty(stream, 'map', {
127+
value: common.mustNotCall(() => {}),
128+
});
129+
// Check that map isn't getting called.
130+
stream.flatMap(() => true);
131+
}
Collapse file

‎test/parallel/test-stream-forEach.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-forEach.js
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,11 @@ const { setTimeout } = require('timers/promises');
8484
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
8585
assert.strictEqual(typeof stream.then, 'function');
8686
}
87+
{
88+
const stream = Readable.from([1, 2, 3, 4, 5]);
89+
Object.defineProperty(stream, 'map', {
90+
value: common.mustNotCall(() => {}),
91+
});
92+
// Check that map isn't getting called.
93+
stream.forEach(() => true);
94+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.