Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fd41ef3

Browse filesBrowse files
jasnelladuh95
authored andcommitted
test: add tests for experimental stream/iter implementation
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-By: Claude/Opus 4.6 PR-URL: #62066 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent e1f0d2a commit fd41ef3
Copy full SHA for fd41ef3

39 files changed

+8,924Lines changed: 8924 additions & 0 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎test/parallel/test-fs-promises-file-handle-pull.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-fs-promises-file-handle-pull.js
+440Lines changed: 440 additions & 0 deletions
Large diffs are not rendered by default.
Collapse file

‎test/parallel/test-fs-promises-file-handle-pullsync.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-fs-promises-file-handle-pullsync.js
+498Lines changed: 498 additions & 0 deletions
Large diffs are not rendered by default.
Collapse file

‎test/parallel/test-fs-promises-file-handle-writer.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-fs-promises-file-handle-writer.js
+1,126Lines changed: 1126 additions & 0 deletions
Large diffs are not rendered by default.
Collapse file
+138Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { broadcast, text } = require('stream/iter');
7+
8+
// =============================================================================
9+
// Backpressure policies
10+
// =============================================================================
11+
12+
async function testDropOldest() {
13+
const { writer, broadcast: bc } = broadcast({
14+
highWaterMark: 2,
15+
backpressure: 'drop-oldest',
16+
});
17+
const consumer = bc.push();
18+
19+
writer.writeSync('first');
20+
writer.writeSync('second');
21+
// This should drop 'first'
22+
writer.writeSync('third');
23+
writer.endSync();
24+
25+
const data = await text(consumer);
26+
assert.strictEqual(data, 'secondthird');
27+
}
28+
29+
async function testDropNewest() {
30+
const { writer, broadcast: bc } = broadcast({
31+
highWaterMark: 1,
32+
backpressure: 'drop-newest',
33+
});
34+
const consumer = bc.push();
35+
36+
writer.writeSync('kept');
37+
// This should be silently dropped
38+
writer.writeSync('dropped');
39+
writer.endSync();
40+
41+
const data = await text(consumer);
42+
assert.strictEqual(data, 'kept');
43+
}
44+
45+
// =============================================================================
46+
// Block backpressure
47+
// =============================================================================
48+
49+
async function testBlockBackpressure() {
50+
const { writer, broadcast: bc } = broadcast({
51+
highWaterMark: 1,
52+
backpressure: 'block',
53+
});
54+
const consumer = bc.push();
55+
writer.writeSync('a');
56+
57+
// Next write should block
58+
let writeResolved = false;
59+
const writePromise = writer.write('b').then(() => { writeResolved = true; });
60+
await new Promise(setImmediate);
61+
assert.strictEqual(writeResolved, false);
62+
63+
// Drain consumer to unblock the pending write
64+
const iter = consumer[Symbol.asyncIterator]();
65+
const first = await iter.next();
66+
assert.strictEqual(first.done, false);
67+
await new Promise(setImmediate);
68+
assert.strictEqual(writeResolved, true);
69+
70+
writer.endSync();
71+
// Drain remaining data and verify completion
72+
const second = await iter.next();
73+
assert.strictEqual(second.done, false);
74+
await writePromise;
75+
}
76+
77+
// Verify block backpressure data flows correctly end-to-end
78+
async function testBlockBackpressureContent() {
79+
const { writer, broadcast: bc } = broadcast({
80+
highWaterMark: 1,
81+
backpressure: 'block',
82+
});
83+
const consumer = bc.push();
84+
85+
writer.writeSync('a');
86+
const writePromise = writer.write('b');
87+
await new Promise(setImmediate);
88+
89+
// Read all and verify content
90+
const iter = consumer[Symbol.asyncIterator]();
91+
const first = await iter.next();
92+
assert.strictEqual(first.done, false);
93+
const firstStr = new TextDecoder().decode(first.value[0]);
94+
assert.strictEqual(firstStr, 'a');
95+
96+
await writePromise;
97+
writer.endSync();
98+
99+
const second = await iter.next();
100+
assert.strictEqual(second.done, false);
101+
const secondStr = new TextDecoder().decode(second.value[0]);
102+
assert.strictEqual(secondStr, 'b');
103+
104+
const done = await iter.next();
105+
assert.strictEqual(done.done, true);
106+
}
107+
108+
// Writev async path
109+
async function testWritevAsync() {
110+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
111+
const consumer = bc.push();
112+
113+
await writer.writev(['hello', ' ', 'world']);
114+
await writer.end();
115+
116+
const data = await text(consumer);
117+
assert.strictEqual(data, 'hello world');
118+
}
119+
120+
// endSync returns the total byte count
121+
async function testEndSyncReturnValue() {
122+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
123+
bc.push(); // Need a consumer to write to
124+
125+
writer.writeSync('hello'); // 5 bytes
126+
writer.writeSync(' world'); // 6 bytes
127+
const total = writer.endSync();
128+
assert.strictEqual(total, 11);
129+
}
130+
131+
Promise.all([
132+
testDropOldest(),
133+
testDropNewest(),
134+
testBlockBackpressure(),
135+
testBlockBackpressureContent(),
136+
testWritevAsync(),
137+
testEndSyncReturnValue(),
138+
]).then(common.mustCall());
Collapse file
+260Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { broadcast, text } = require('stream/iter');
7+
8+
// =============================================================================
9+
// Basic broadcast
10+
// =============================================================================
11+
12+
async function testBasicBroadcast() {
13+
const { writer, broadcast: bc } = broadcast();
14+
15+
// Create two consumers
16+
const consumer1 = bc.push();
17+
const consumer2 = bc.push();
18+
19+
assert.strictEqual(bc.consumerCount, 2);
20+
21+
await writer.write('hello');
22+
await writer.end();
23+
24+
const [data1, data2] = await Promise.all([
25+
text(consumer1),
26+
text(consumer2),
27+
]);
28+
29+
assert.strictEqual(data1, 'hello');
30+
assert.strictEqual(data2, 'hello');
31+
}
32+
33+
async function testMultipleWrites() {
34+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
35+
36+
const consumer = bc.push();
37+
38+
await writer.write('a');
39+
await writer.write('b');
40+
await writer.write('c');
41+
await writer.end();
42+
43+
const data = await text(consumer);
44+
assert.strictEqual(data, 'abc');
45+
}
46+
47+
async function testConsumerCount() {
48+
const { broadcast: bc } = broadcast();
49+
50+
assert.strictEqual(bc.consumerCount, 0);
51+
52+
const c1 = bc.push();
53+
assert.strictEqual(bc.consumerCount, 1);
54+
55+
bc.push();
56+
assert.strictEqual(bc.consumerCount, 2);
57+
58+
bc.cancel();
59+
60+
// After cancel, consumer count drops to 0
61+
assert.strictEqual(bc.consumerCount, 0);
62+
63+
// Consumers are detached and yield nothing
64+
const batches = [];
65+
for await (const batch of c1) {
66+
batches.push(batch);
67+
}
68+
assert.strictEqual(batches.length, 0);
69+
}
70+
71+
// =============================================================================
72+
// Writer methods
73+
// =============================================================================
74+
75+
async function testWriteSync() {
76+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 2 });
77+
const consumer = bc.push();
78+
79+
assert.strictEqual(writer.writeSync('a'), true);
80+
assert.strictEqual(writer.writeSync('b'), true);
81+
// Buffer full (highWaterMark=2, strict policy)
82+
assert.strictEqual(writer.writeSync('c'), false);
83+
84+
writer.endSync();
85+
86+
const data = await text(consumer);
87+
assert.strictEqual(data, 'ab');
88+
}
89+
90+
async function testWritevSync() {
91+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
92+
const consumer = bc.push();
93+
94+
assert.strictEqual(writer.writevSync(['hello', ' ', 'world']), true);
95+
writer.endSync();
96+
97+
const data = await text(consumer);
98+
assert.strictEqual(data, 'hello world');
99+
}
100+
101+
async function testWriterEnd() {
102+
const { writer, broadcast: bc } = broadcast();
103+
const consumer = bc.push();
104+
105+
await writer.write('data');
106+
const totalBytes = await writer.end();
107+
assert.strictEqual(totalBytes, 4); // 'data' = 4 UTF-8 bytes
108+
109+
const data = await text(consumer);
110+
assert.strictEqual(data, 'data');
111+
}
112+
113+
async function testWriterFail() {
114+
const { writer, broadcast: bc } = broadcast();
115+
const consumer = bc.push();
116+
117+
writer.fail(new Error('test error'));
118+
119+
await assert.rejects(
120+
async () => {
121+
// eslint-disable-next-line no-unused-vars
122+
for await (const _ of consumer) {
123+
assert.fail('Should not reach here');
124+
}
125+
},
126+
{ message: 'test error' },
127+
);
128+
}
129+
130+
// =============================================================================
131+
// Cancel
132+
// =============================================================================
133+
134+
async function testCancelWithoutReason() {
135+
const { broadcast: bc } = broadcast();
136+
const consumer = bc.push();
137+
138+
bc.cancel();
139+
140+
const batches = [];
141+
for await (const batch of consumer) {
142+
batches.push(batch);
143+
}
144+
assert.strictEqual(batches.length, 0);
145+
}
146+
147+
async function testCancelWithReason() {
148+
const { broadcast: bc } = broadcast();
149+
150+
// Start a consumer that is waiting for data (promise pending)
151+
const consumer = bc.push();
152+
const resultPromise = text(consumer).catch((err) => err);
153+
154+
// Give the consumer time to enter the waiting state
155+
await new Promise((resolve) => setImmediate(resolve));
156+
157+
bc.cancel(new Error('cancelled'));
158+
159+
const result = await resultPromise;
160+
assert.ok(result instanceof Error);
161+
assert.strictEqual(result.message, 'cancelled');
162+
}
163+
164+
// =============================================================================
165+
// Writer fail detaches consumers
166+
// =============================================================================
167+
168+
async function testFailDetachesConsumers() {
169+
const { writer, broadcast: bc } = broadcast();
170+
const consumer1 = bc.push();
171+
const consumer2 = bc.push();
172+
173+
assert.strictEqual(bc.consumerCount, 2);
174+
175+
// Write some data, then fail the writer
176+
await writer.write('data');
177+
await writer.fail(new Error('writer failed'));
178+
179+
// After fail, consumers are detached
180+
assert.strictEqual(bc.consumerCount, 0);
181+
182+
// Both consumers should see the error
183+
await assert.rejects(
184+
async () => {
185+
// eslint-disable-next-line no-unused-vars
186+
for await (const _ of consumer1) {
187+
assert.fail('Should not reach here');
188+
}
189+
},
190+
{ message: 'writer failed' },
191+
);
192+
193+
await assert.rejects(
194+
async () => {
195+
// eslint-disable-next-line no-unused-vars
196+
for await (const _ of consumer2) {
197+
assert.fail('Should not reach here');
198+
}
199+
},
200+
{ message: 'writer failed' },
201+
);
202+
}
203+
204+
// =============================================================================
205+
// Writer fail idempotent
206+
// =============================================================================
207+
208+
async function testWriterFailIdempotent() {
209+
const { writer, broadcast: bc } = broadcast();
210+
const consumer = bc.push();
211+
writer.writeSync('hello');
212+
writer.fail(new Error('fail!'));
213+
// Second call is a no-op (already errored)
214+
writer.fail(new Error('fail2'));
215+
await assert.rejects(async () => {
216+
// eslint-disable-next-line no-unused-vars
217+
for await (const _ of consumer) { /* consume */ }
218+
}, { message: 'fail!' });
219+
}
220+
221+
// cancel() with falsy reason (0, "", false) should still treat as error
222+
async function testCancelWithFalsyReason() {
223+
const { broadcast: bc } = broadcast();
224+
const consumer = bc.push();
225+
const resultPromise = text(consumer).catch((err) => err);
226+
await new Promise((resolve) => setImmediate(resolve));
227+
bc.cancel(0);
228+
const result = await resultPromise;
229+
assert.strictEqual(result, 0);
230+
}
231+
232+
// Late-joining consumer should read from oldest buffered entry
233+
async function testLateJoinerSeesBufferedData() {
234+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 16 });
235+
236+
// Write data before any consumer joins
237+
writer.writeSync('before-join');
238+
writer.endSync();
239+
240+
// Consumer joins after data is written
241+
const consumer = bc.push();
242+
const result = await text(consumer);
243+
assert.strictEqual(result, 'before-join');
244+
}
245+
246+
Promise.all([
247+
testBasicBroadcast(),
248+
testMultipleWrites(),
249+
testConsumerCount(),
250+
testWriteSync(),
251+
testWritevSync(),
252+
testWriterEnd(),
253+
testWriterFail(),
254+
testCancelWithoutReason(),
255+
testCancelWithReason(),
256+
testCancelWithFalsyReason(),
257+
testFailDetachesConsumers(),
258+
testWriterFailIdempotent(),
259+
testLateJoinerSeesBufferedData(),
260+
]).then(common.mustCall());

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.