Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 09c25bb

Browse filesBrowse files
benjamingrtargos
authored andcommitted
stream: add filter method to readable
This continues the work in #40815 to make streams compatible with upcoming ECMAScript language features. It adds an experimental `filter` api to streams and tests/docs for it. See https://github.com/tc39/proposal-iterator-helpers/ Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #41354 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 0d18a8c commit 09c25bb
Copy full SHA for 09c25bb

File tree

Expand file treeCollapse file tree

3 files changed

+172
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+172
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+49Lines changed: 49 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1727,6 +1727,55 @@ for await (const result of dnsResults) {
17271727
}
17281728
```
17291729

1730+
### `readable.filter(fn[, options])`
1731+
1732+
<!-- YAML
1733+
added: REPLACEME
1734+
-->
1735+
1736+
> Stability: 1 - Experimental
1737+
1738+
* `fn` {Function|AsyncFunction} a function to filter items from stream.
1739+
* `data` {any} a chunk of data from the stream.
1740+
* `options` {Object}
1741+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1742+
abort the `fn` call early.
1743+
* `options` {Object}
1744+
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1745+
on the stream at once. **Default:** `1`.
1746+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1747+
aborted.
1748+
* Returns: {Readable} a stream filtered with the predicate `fn`.
1749+
1750+
This method allows filtering the stream. For each item in the stream the `fn`
1751+
function will be called and if it returns a truthy value, the item will be
1752+
passed to the result stream. If the `fn` function returns a promise - that
1753+
promise will be `await`ed.
1754+
1755+
```mjs
1756+
import { Readable } from 'stream';
1757+
import { Resolver } from 'dns/promises';
1758+
1759+
// With a synchronous predicate.
1760+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1761+
console.log(item); // 3, 4
1762+
}
1763+
// With an asynchronous predicate, making at most 2 queries at a time.
1764+
const resolver = new Resolver();
1765+
const dnsResults = await Readable.from([
1766+
'nodejs.org',
1767+
'openjsf.org',
1768+
'www.linuxfoundation.org',
1769+
]).filter(async (domain) => {
1770+
const { address } = await resolver.resolve4(domain, { ttl: true });
1771+
return address.ttl > 60;
1772+
}, { concurrency: 2 });
1773+
for await (const result of dnsResults) {
1774+
// Logs domains with more than 60 seconds on the resolved dns record.
1775+
console.log(result);
1776+
}
1777+
```
1778+
17301779
### Duplex and transform streams
17311780

17321781
#### Class: `stream.Duplex`
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+14Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function * filter(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw (new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], this));
154+
}
155+
async function filterFn(value, options) {
156+
if (await fn(value, options)) {
157+
return value;
158+
}
159+
return kEmpty;
160+
}
161+
yield* this.map(filterFn, options);
162+
}
150163
module.exports = {
151164
map,
165+
filter
152166
};
Collapse file
+109Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// Filter works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
13+
const result = [1, 2];
14+
(async () => {
15+
for await (const item of stream) {
16+
assert.strictEqual(item, result.shift());
17+
}
18+
})().then(common.mustCall());
19+
}
20+
21+
{
22+
// Filter works on synchronous streams with an asynchronous predicate
23+
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
24+
await Promise.resolve();
25+
return x > 3;
26+
});
27+
const result = [4, 5];
28+
(async () => {
29+
for await (const item of stream) {
30+
assert.strictEqual(item, result.shift());
31+
}
32+
})().then(common.mustCall());
33+
}
34+
35+
{
36+
// Map works on asynchronous streams with a asynchronous mapper
37+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
38+
await Promise.resolve();
39+
return x + x;
40+
}).filter((x) => x > 5);
41+
const result = [6, 8, 10];
42+
(async () => {
43+
for await (const item of stream) {
44+
assert.strictEqual(item, result.shift());
45+
}
46+
})().then(common.mustCall());
47+
}
48+
49+
{
50+
// Concurrency + AbortSignal
51+
const ac = new AbortController();
52+
let calls = 0;
53+
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
54+
calls++;
55+
await setTimeout(100, { signal });
56+
}, { signal: ac.signal, concurrency: 2 });
57+
// pump
58+
assert.rejects(async () => {
59+
for await (const item of stream) {
60+
// nope
61+
console.log(item);
62+
}
63+
}, {
64+
name: 'AbortError',
65+
}).then(common.mustCall());
66+
67+
setImmediate(() => {
68+
ac.abort();
69+
assert.strictEqual(calls, 2);
70+
});
71+
}
72+
73+
{
74+
// Concurrency result order
75+
const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
76+
await setTimeout(10 - item, { signal });
77+
return true;
78+
}, { concurrency: 2 });
79+
80+
(async () => {
81+
const expected = [1, 2];
82+
for await (const item of stream) {
83+
assert.strictEqual(item, expected.shift());
84+
}
85+
})().then(common.mustCall());
86+
}
87+
88+
{
89+
// Error cases
90+
assert.rejects(async () => {
91+
// eslint-disable-next-line no-unused-vars
92+
for await (const unused of Readable.from([1]).filter(1));
93+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94+
assert.rejects(async () => {
95+
// eslint-disable-next-line no-unused-vars
96+
for await (const _ of Readable.from([1]).filter((x) => x, {
97+
concurrency: 'Foo'
98+
}));
99+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100+
assert.rejects(async () => {
101+
// eslint-disable-next-line no-unused-vars
102+
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
104+
}
105+
{
106+
// Test result is a Readable
107+
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
108+
assert.strictEqual(stream.readable, true);
109+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.