Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b02bf41

Browse filesBrowse files
benjamingraduh95
authored andcommitted
test: add stream map tests
Add more tests to check and enforce the behavior of the map method. Co-Authored-By: Antoine du Hamel <duhamelantoine1995@gmail.com> PR-URL: #41642 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 3f4ce07 commit b02bf41
Copy full SHA for b02bf41

File tree

Expand file treeCollapse file tree

1 file changed

+105
-16
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

1 file changed

+105
-16
lines changed
Open diff view settings
Collapse file

‎test/parallel/test-stream-map.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-map.js
+105-16Lines changed: 105 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@ const {
55
Readable,
66
} = require('stream');
77
const assert = require('assert');
8+
const { once } = require('events');
89
const { setTimeout } = require('timers/promises');
910

1011
{
1112
// Map works on synchronous streams with a synchronous mapper
1213
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
13-
const result = [2, 4, 6, 8, 10];
1414
(async () => {
15-
for await (const item of stream) {
16-
assert.strictEqual(item, result.shift());
17-
}
15+
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
1816
})().then(common.mustCall());
1917
}
2018

@@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises');
2422
await Promise.resolve();
2523
return x + x;
2624
});
27-
const result = [2, 4, 6, 8, 10];
25+
(async () => {
26+
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
27+
})().then(common.mustCall());
28+
}
29+
30+
{
31+
// Map works on asynchronous streams with a asynchronous mapper
32+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
33+
return x + x;
34+
}).map((x) => x + x);
35+
(async () => {
36+
assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
37+
})().then(common.mustCall());
38+
}
39+
40+
{
41+
// Map works on an infinite stream
42+
const stream = Readable.from(async function* () {
43+
while (true) yield 1;
44+
}()).map(common.mustCall(async (x) => {
45+
return x + x;
46+
}, 5));
47+
(async () => {
48+
let i = 1;
49+
for await (const item of stream) {
50+
assert.strictEqual(item, 2);
51+
if (++i === 5) break;
52+
}
53+
})().then(common.mustCall());
54+
}
55+
56+
{
57+
// Map works on non-objectMode streams
58+
const stream = new Readable({
59+
read() {
60+
this.push(Uint8Array.from([1]));
61+
this.push(Uint8Array.from([2]));
62+
this.push(null);
63+
}
64+
}).map(async ([x]) => {
65+
return x + x;
66+
}).map((x) => x + x);
67+
const result = [4, 8];
2868
(async () => {
2969
for await (const item of stream) {
3070
assert.strictEqual(item, result.shift());
@@ -33,39 +73,88 @@ const { setTimeout } = require('timers/promises');
3373
}
3474

3575
{
36-
// Map works on asynchronous streams with a asynchronous mapper
37-
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
76+
// Does not care about data events
77+
const source = new Readable({
78+
read() {
79+
this.push(Uint8Array.from([1]));
80+
this.push(Uint8Array.from([2]));
81+
this.push(null);
82+
}
83+
});
84+
setImmediate(() => stream.emit('data', Uint8Array.from([1])));
85+
const stream = source.map(async ([x]) => {
3886
return x + x;
3987
}).map((x) => x + x);
40-
const result = [4, 8, 12, 16, 20];
88+
const result = [4, 8];
4189
(async () => {
4290
for await (const item of stream) {
4391
assert.strictEqual(item, result.shift());
4492
}
4593
})().then(common.mustCall());
4694
}
4795

96+
{
97+
// Emitting an error during `map`
98+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
99+
if (x === 3) {
100+
stream.emit('error', new Error('boom'));
101+
}
102+
return x + x;
103+
});
104+
assert.rejects(
105+
stream.map((x) => x + x).toArray(),
106+
/boom/,
107+
).then(common.mustCall());
108+
}
109+
110+
{
111+
// Throwing an error during `map` (sync)
112+
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
113+
if (x === 3) {
114+
throw new Error('boom');
115+
}
116+
return x + x;
117+
});
118+
assert.rejects(
119+
stream.map((x) => x + x).toArray(),
120+
/boom/,
121+
).then(common.mustCall());
122+
}
123+
124+
125+
{
126+
// Throwing an error during `map` (async)
127+
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
128+
if (x === 3) {
129+
throw new Error('boom');
130+
}
131+
return x + x;
132+
});
133+
assert.rejects(
134+
stream.map((x) => x + x).toArray(),
135+
/boom/,
136+
).then(common.mustCall());
137+
}
138+
48139
{
49140
// Concurrency + AbortSignal
50141
const ac = new AbortController();
51-
let calls = 0;
52-
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
53-
calls++;
54-
await setTimeout(100, { signal });
55-
}, { signal: ac.signal, concurrency: 2 });
142+
const range = Readable.from([1, 2, 3, 4, 5]);
143+
const stream = range.map(common.mustCall(async (_, { signal }) => {
144+
await once(signal, 'abort');
145+
throw signal.reason;
146+
}, 2), { signal: ac.signal, concurrency: 2 });
56147
// pump
57148
assert.rejects(async () => {
58149
for await (const item of stream) {
59-
// nope
60-
console.log(item);
150+
assert.fail('should not reach here, got ' + item);
61151
}
62152
}, {
63153
name: 'AbortError',
64154
}).then(common.mustCall());
65155

66156
setImmediate(() => {
67157
ac.abort();
68-
assert.strictEqual(calls, 2);
69158
});
70159
}
71160

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.