Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 46ec74d

Browse filesBrowse files
benjamingrronag
authored andcommitted
stream: support flatMap
Support the `flatMap` method from the iterator helper TC39 proposal on readable streams. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #41612 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent ae7df17 commit 46ec74d
Copy full SHA for 46ec74d

File tree

Expand file treeCollapse file tree

3 files changed

+188
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+188
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+49Lines changed: 49 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1973,6 +1973,55 @@ console.log(allBigFiles);
19731973
console.log('done'); // Stream has finished
19741974
```
19751975

1976+
### `readable.flatMap(fn[, options])`
1977+
1978+
<!-- YAML
1979+
added: REPLACEME
1980+
-->
1981+
1982+
> Stability: 1 - Experimental
1983+
1984+
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1985+
every item in the stream.
1986+
* `data` {any} a chunk of data from the stream.
1987+
* `options` {Object}
1988+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1989+
abort the `fn` call early.
1990+
* `options` {Object}
1991+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1992+
on the stream at once. **Default:** `1`.
1993+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1994+
aborted.
1995+
* Returns: {Readable} a stream flat-mapped with the function `fn`.
1996+
1997+
This method returns a new stream by applying the given callback to each
1998+
chunk of the stream and then flattening the result.
1999+
2000+
It is possible to return a stream or another iterable or async iterable from
2001+
`fn` and the result streams will be merged (flattened) into the returned
2002+
stream.
2003+
2004+
```mjs
2005+
import { Readable } from 'stream';
2006+
import { createReadStream } from 'fs';
2007+
2008+
// With a synchronous mapper.
2009+
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2010+
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2011+
}
2012+
// With an asynchronous mapper, combine the contents of 4 files
2013+
const concatResult = Readable.from([
2014+
'./1.mjs',
2015+
'./2.mjs',
2016+
'./3.mjs',
2017+
'./4.mjs',
2018+
]).flatMap((fileName) => createReadStream(fileName));
2019+
for await (const result of concatResult) {
2020+
// This will contain the contents (all chunks) of all 4 files
2021+
console.log(result);
2022+
}
2023+
```
2024+
19762025
### Duplex and transform streams
19772026

19782027
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,16 @@ async function toArray(options) {
229229
}
230230
return result;
231231
}
232+
233+
async function* flatMap(fn, options) {
234+
for await (const val of this.map(fn, options)) {
235+
yield* val;
236+
}
237+
}
238+
232239
module.exports.streamReturningOperators = {
233240
filter,
241+
flatMap,
234242
map,
235243
};
236244

Collapse file
+131Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fixtures = require('../common/fixtures');
5+
const {
6+
Readable,
7+
} = require('stream');
8+
const assert = require('assert');
9+
const { setTimeout } = require('timers/promises');
10+
const { createReadStream } = require('fs');
11+
12+
function oneTo5() {
13+
return Readable.from([1, 2, 3, 4, 5]);
14+
}
15+
16+
{
17+
// flatMap works on synchronous streams with a synchronous mapper
18+
(async () => {
19+
assert.deepStrictEqual(
20+
await oneTo5().flatMap((x) => [x + x]).toArray(),
21+
[2, 4, 6, 8, 10]
22+
);
23+
assert.deepStrictEqual(
24+
await oneTo5().flatMap(() => []).toArray(),
25+
[]
26+
);
27+
assert.deepStrictEqual(
28+
await oneTo5().flatMap((x) => [x, x]).toArray(),
29+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
30+
);
31+
})().then(common.mustCall());
32+
}
33+
34+
35+
{
36+
// flatMap works on sync/async streams with an asynchronous mapper
37+
(async () => {
38+
assert.deepStrictEqual(
39+
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
40+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
41+
);
42+
const asyncOneTo5 = oneTo5().map(async (x) => x);
43+
assert.deepStrictEqual(
44+
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
45+
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
46+
);
47+
})().then(common.mustCall());
48+
}
49+
{
50+
// flatMap works on a stream where mapping returns a stream
51+
(async () => {
52+
const result = await oneTo5().flatMap(async (x) => {
53+
return Readable.from([x, x]);
54+
}).toArray();
55+
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
56+
})().then(common.mustCall());
57+
// flatMap works on an objectMode stream where mappign returns a stream
58+
(async () => {
59+
const result = await oneTo5().flatMap(() => {
60+
return createReadStream(fixtures.path('x.txt'));
61+
}).toArray();
62+
// The resultant stream is in object mode so toArray shouldn't flatten
63+
assert.strictEqual(result.length, 5);
64+
assert.deepStrictEqual(
65+
Buffer.concat(result).toString(),
66+
'xyz\n'.repeat(5)
67+
);
68+
69+
})().then(common.mustCall());
70+
71+
}
72+
73+
{
74+
// Concurrency + AbortSignal
75+
const ac = new AbortController();
76+
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
77+
await setTimeout(100, { signal });
78+
}), { signal: ac.signal, concurrency: 2 });
79+
// pump
80+
assert.rejects(async () => {
81+
for await (const item of stream) {
82+
// nope
83+
console.log(item);
84+
}
85+
}, {
86+
name: 'AbortError',
87+
}).then(common.mustCall());
88+
89+
queueMicrotask(() => {
90+
ac.abort();
91+
});
92+
}
93+
94+
{
95+
// Already aborted AbortSignal
96+
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
97+
await setTimeout(100, { signal });
98+
}), { signal: AbortSignal.abort() });
99+
// pump
100+
assert.rejects(async () => {
101+
for await (const item of stream) {
102+
// nope
103+
console.log(item);
104+
}
105+
}, {
106+
name: 'AbortError',
107+
}).then(common.mustCall());
108+
}
109+
110+
{
111+
// Error cases
112+
assert.rejects(async () => {
113+
// eslint-disable-next-line no-unused-vars
114+
for await (const unused of Readable.from([1]).flatMap(1));
115+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116+
assert.rejects(async () => {
117+
// eslint-disable-next-line no-unused-vars
118+
for await (const _ of Readable.from([1]).flatMap((x) => x, {
119+
concurrency: 'Foo'
120+
}));
121+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
122+
assert.rejects(async () => {
123+
// eslint-disable-next-line no-unused-vars
124+
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
125+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
126+
}
127+
{
128+
// Test result is a Readable
129+
const stream = oneTo5().flatMap((x) => x);
130+
assert.strictEqual(stream.readable, true);
131+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.