Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 311050e

Browse filesBrowse files
benjamingrruyadorno
authored andcommitted
stream: add asIndexedPairs
Add the asIndexedPairs method for readable streams. PR-URL: #41681 Refs: https://github.com/tc39/proposal-iterator-helpers#asindexedpairs Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent ae34900 commit 311050e
Copy full SHA for 311050e

File tree

Expand file treeCollapse file tree

3 files changed

+82
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+82
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+24Lines changed: 24 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2064,6 +2064,30 @@ import { Readable } from 'stream';
20642064
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
20652065
```
20662066

2067+
### `readable.asIndexedPairs([options])`
2068+
2069+
<!-- YAML
2070+
added: REPLACEME
2071+
-->
2072+
2073+
> Stability: 1 - Experimental
2074+
2075+
* `options` {Object}
2076+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2077+
aborted.
2078+
* Returns: {Readable} a stream of indexed pairs.
2079+
2080+
This method returns a new stream with chunks of the underlying stream paired
2081+
with a counter in the form `[index, chunk]`. The first index value is 0 and it
2082+
increases by 1 for each chunk produced.
2083+
2084+
```mjs
2085+
import { Readable } from 'stream';
2086+
2087+
const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
2088+
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
2089+
```
2090+
20672091
### Duplex and transform streams
20682092

20692093
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ async function * map(fn, options) {
157157
}
158158
}
159159

160+
async function* asIndexedPairs(options) {
161+
let index = 0;
162+
for await (const val of this) {
163+
if (options?.signal?.aborted) {
164+
throw new AbortError({ cause: options.signal.reason });
165+
}
166+
yield [index++, val];
167+
}
168+
}
169+
160170
async function some(fn, options) {
161171
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
162172
// Note that some does short circuit but also closes the iterator if it does
@@ -286,6 +296,7 @@ function take(number, options) {
286296
}
287297

288298
module.exports.streamReturningOperators = {
299+
asIndexedPairs,
289300
drop,
290301
filter,
291302
flatMap,
Collapse file
+47Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import '../common/index.mjs';
2+
import { Readable } from 'stream';
3+
import { deepStrictEqual, rejects } from 'assert';
4+
5+
{
6+
// asIndexedPairs with a synchronous stream
7+
const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray();
8+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
9+
const empty = await Readable.from([]).asIndexedPairs().toArray();
10+
deepStrictEqual(empty, []);
11+
}
12+
13+
{
14+
// asIndexedPairs works an asynchronous streams
15+
const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x);
16+
const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray();
17+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
18+
const empty = await asyncFrom([]).asIndexedPairs().toArray();
19+
deepStrictEqual(empty, []);
20+
}
21+
22+
{
23+
// Does not enumerate an infinite stream
24+
const infinite = () => Readable.from(async function* () {
25+
while (true) yield 1;
26+
}());
27+
const pairs = await infinite().asIndexedPairs().take(3).toArray();
28+
deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]);
29+
const empty = await infinite().asIndexedPairs().take(0).toArray();
30+
deepStrictEqual(empty, []);
31+
}
32+
33+
{
34+
// AbortSignal
35+
await rejects(async () => {
36+
const ac = new AbortController();
37+
const { signal } = ac;
38+
const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
39+
ac.abort();
40+
await p;
41+
}, { name: 'AbortError' });
42+
43+
await rejects(async () => {
44+
const signal = AbortSignal.abort();
45+
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
46+
}, /AbortError/);
47+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.