Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 62e1a68

Browse filesBrowse files
benjamingrronag
authored andcommitted
stream: add toArray
Add the toArray method from the TC39 iterator helper proposal to Readable streams. This also enables a common-use case of converting a stream to an array. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #41553 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 6de8e51 commit 62e1a68
Copy full SHA for 62e1a68

File tree

Expand file treeCollapse file tree

3 files changed

+136
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+136
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+40Lines changed: 40 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1835,6 +1835,46 @@ await dnsResults.forEach((result) => {
18351835
console.log('done'); // Stream has finished
18361836
```
18371837

1838+
### `readable.toArray([options])`
1839+
1840+
<!-- YAML
1841+
added: REPLACEME
1842+
-->
1843+
1844+
> Stability: 1 - Experimental
1845+
1846+
* `options` {Object}
1847+
* `signal` {AbortSignal} allows cancelling the toArray operation if the
1848+
signal is aborted.
1849+
* Returns: {Promise} a promise containing an array (if the stream is in
1850+
object mode) or Buffer with the contents of the stream.
1851+
1852+
This method allows easily obtaining the contents of a stream. If the
1853+
stream is in [object mode][object-mode] an array of its contents is returned.
1854+
If the stream is not in object mode a Buffer containing its data is returned.
1855+
1856+
As this method reads the entire stream into memory, it negates the benefits of
1857+
streams. It's intended for interoperability and convenience, not as the primary
1858+
way to consume streams.
1859+
1860+
```mjs
1861+
import { Readable } from 'stream';
1862+
import { Resolver } from 'dns/promises';
1863+
1864+
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
1865+
1866+
// Make dns queries concurrently using .map and collect
1867+
// the results into an aray using toArray
1868+
const dnsResults = await Readable.from([
1869+
'nodejs.org',
1870+
'openjsf.org',
1871+
'www.linuxfoundation.org',
1872+
]).map(async (domain) => {
1873+
const { address } = await resolver.resolve4(domain, { ttl: true });
1874+
return address;
1875+
}, { concurrency: 2 }).toArray();
1876+
```
1877+
18381878
### Duplex and transform streams
18391879

18401880
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict';
22

33
const { AbortController } = require('internal/abort_controller');
4+
const { Buffer } = require('buffer');
5+
46
const {
57
codes: {
68
ERR_INVALID_ARG_TYPE,
@@ -10,6 +12,7 @@ const {
1012
const { validateInteger } = require('internal/validators');
1113

1214
const {
15+
ArrayPrototypePush,
1316
MathFloor,
1417
Promise,
1518
PromiseReject,
@@ -174,11 +177,25 @@ async function * filter(fn, options) {
174177
yield* this.map(filterFn, options);
175178
}
176179

180+
async function toArray(options) {
181+
const result = [];
182+
for await (const val of this) {
183+
if (options?.signal?.aborted) {
184+
throw new AbortError({ cause: options.signal.reason });
185+
}
186+
ArrayPrototypePush(result, val);
187+
}
188+
if (!this.readableObjectMode) {
189+
return Buffer.concat(result);
190+
}
191+
return result;
192+
}
177193
module.exports.streamReturningOperators = {
178194
filter,
179195
map,
180196
};
181197

182198
module.exports.promiseReturningOperators = {
183199
forEach,
200+
toArray,
184201
};
Collapse file
+79Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
{
10+
// Works on a synchronous stream
11+
(async () => {
12+
const tests = [
13+
[],
14+
[1],
15+
[1, 2, 3],
16+
Array(100).fill().map((_, i) => i),
17+
];
18+
for (const test of tests) {
19+
const stream = Readable.from(test);
20+
const result = await stream.toArray();
21+
assert.deepStrictEqual(result, test);
22+
}
23+
})().then(common.mustCall());
24+
}
25+
26+
{
27+
// Works on a non-object-mode stream and flattens it
28+
(async () => {
29+
const stream = Readable.from(
30+
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
31+
, { objectMode: false });
32+
const result = await stream.toArray();
33+
assert.strictEqual(Buffer.isBuffer(result), true);
34+
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
35+
})().then(common.mustCall());
36+
}
37+
38+
{
39+
// Works on an asynchronous stream
40+
(async () => {
41+
const tests = [
42+
[],
43+
[1],
44+
[1, 2, 3],
45+
Array(100).fill().map((_, i) => i),
46+
];
47+
for (const test of tests) {
48+
const stream = Readable.from(test).map((x) => Promise.resolve(x));
49+
const result = await stream.toArray();
50+
assert.deepStrictEqual(result, test);
51+
}
52+
})().then(common.mustCall());
53+
}
54+
55+
{
56+
// Support for AbortSignal
57+
const ac = new AbortController();
58+
let stream;
59+
assert.rejects(async () => {
60+
stream = Readable.from([1, 2, 3]).map(async (x) => {
61+
if (x === 3) {
62+
await new Promise(() => {}); // Explicitly do not pass signal here
63+
}
64+
return Promise.resolve(x);
65+
});
66+
await stream.toArray({ signal: ac.signal });
67+
}, {
68+
name: 'AbortError',
69+
}).then(common.mustCall(() => {
70+
// Only stops toArray, does not destory the stream
71+
assert(stream.destroyed, false);
72+
}));
73+
ac.abort();
74+
}
75+
{
76+
// Test result is a Promise
77+
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
78+
assert.strictEqual(result instanceof Promise, true);
79+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.