Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit bfff8cb

Browse filesBrowse files
jasnelladuh95
authored andcommitted
benchmark: add benchmarks for experimental stream/iter
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-By: Claude/Opus 4.6 PR-URL: #62066 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent fd41ef3 commit bfff8cb
Copy full SHA for bfff8cb

9 files changed

+1,158Lines changed: 1158 additions & 0 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file
+103Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Benchmark: pipeToSync with sync compression transforms.
2+
// Measures fully synchronous file-to-file pipeline (no threadpool, no promises).
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const fs = require('fs');
7+
const { openSync, closeSync, writeSync, unlinkSync } = fs;
8+
9+
const tmpdir = require('../../test/common/tmpdir');
10+
tmpdir.refresh();
11+
const srcFile = tmpdir.resolve(`.removeme-sync-bench-src-${process.pid}`);
12+
const dstFile = tmpdir.resolve(`.removeme-sync-bench-dst-${process.pid}`);
13+
14+
const bench = common.createBenchmark(main, {
15+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
16+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
17+
n: [5],
18+
}, {
19+
flags: ['--experimental-stream-iter'],
20+
});
21+
22+
function main({ compression, filesize, n }) {
23+
// Create the fixture file with repeating lowercase ASCII
24+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
25+
const fd = openSync(srcFile, 'w');
26+
let remaining = filesize;
27+
while (remaining > 0) {
28+
const toWrite = Math.min(remaining, chunk.length);
29+
writeSync(fd, chunk, 0, toWrite);
30+
remaining -= toWrite;
31+
}
32+
closeSync(fd);
33+
34+
const { pipeToSync } = require('stream/iter');
35+
const {
36+
compressGzipSync,
37+
compressDeflateSync,
38+
compressBrotliSync,
39+
compressZstdSync,
40+
} = require('zlib/iter');
41+
const { open } = fs.promises;
42+
43+
const compressFactory = {
44+
gzip: compressGzipSync,
45+
deflate: compressDeflateSync,
46+
brotli: compressBrotliSync,
47+
zstd: compressZstdSync,
48+
}[compression];
49+
50+
// Stateless uppercase transform (sync)
51+
const upper = (chunks) => {
52+
if (chunks === null) return null;
53+
const out = new Array(chunks.length);
54+
for (let j = 0; j < chunks.length; j++) {
55+
const src = chunks[j];
56+
const buf = Buffer.allocUnsafe(src.length);
57+
for (let i = 0; i < src.length; i++) {
58+
const b = src[i];
59+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
60+
}
61+
out[j] = buf;
62+
}
63+
return out;
64+
};
65+
66+
// Use a synchronous wrapper since pipeToSync is fully sync.
67+
// We need FileHandle for pullSync/writer, so open async then run sync.
68+
(async () => {
69+
const srcFh = await open(srcFile, 'r');
70+
const dstFh = await open(dstFile, 'w');
71+
72+
// Warm up
73+
runSync(srcFh, dstFh, upper, compressFactory, pipeToSync);
74+
75+
// Reset file positions for the benchmark
76+
await srcFh.close();
77+
await dstFh.close();
78+
79+
bench.start();
80+
let totalBytes = 0;
81+
for (let i = 0; i < n; i++) {
82+
const src = await open(srcFile, 'r');
83+
const dst = await open(dstFile, 'w');
84+
totalBytes += runSync(src, dst, upper, compressFactory, pipeToSync);
85+
await src.close();
86+
await dst.close();
87+
}
88+
bench.end(totalBytes / (1024 * 1024));
89+
90+
cleanup();
91+
})();
92+
}
93+
94+
function runSync(srcFh, dstFh, upper, compressFactory, pipeToSync) {
95+
const w = dstFh.writer();
96+
pipeToSync(srcFh.pullSync(upper, compressFactory()), w);
97+
return w.endSync();
98+
}
99+
100+
function cleanup() {
101+
try { unlinkSync(srcFile); } catch { /* Ignore */ }
102+
try { unlinkSync(dstFile); } catch { /* Ignore */ }
103+
}
Collapse file
+201Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
2+
// reading a large file through two transforms: uppercase then compress.
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const fs = require('fs');
7+
const zlib = require('zlib');
8+
const { Transform, Writable, pipeline } = require('stream');
9+
10+
const tmpdir = require('../../test/common/tmpdir');
11+
tmpdir.refresh();
12+
const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);
13+
14+
const bench = common.createBenchmark(main, {
15+
api: ['classic', 'webstream', 'pull'],
16+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
17+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
18+
n: [5],
19+
}, {
20+
flags: ['--experimental-stream-iter'],
21+
// Classic and webstream only support gzip (native zlib / CompressionStream).
22+
// Brotli, deflate, zstd are pull-only via stream/iter transforms.
23+
combinationFilter({ api, compression }) {
24+
if (api === 'classic' && compression !== 'gzip') return false;
25+
if (api === 'webstream' && compression !== 'gzip') return false;
26+
return true;
27+
},
28+
});
29+
30+
function main({ api, compression, filesize, n }) {
31+
// Create the fixture file with repeating lowercase ASCII
32+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
33+
const fd = fs.openSync(filename, 'w');
34+
let remaining = filesize;
35+
while (remaining > 0) {
36+
const toWrite = Math.min(remaining, chunk.length);
37+
fs.writeSync(fd, chunk, 0, toWrite);
38+
remaining -= toWrite;
39+
}
40+
fs.closeSync(fd);
41+
42+
if (api === 'classic') {
43+
benchClassic(n, filesize).then(() => cleanup());
44+
} else if (api === 'webstream') {
45+
benchWebStream(n, filesize).then(() => cleanup());
46+
} else {
47+
benchPull(n, filesize, compression).then(() => cleanup());
48+
}
49+
}
50+
51+
function cleanup() {
52+
try { fs.unlinkSync(filename); } catch { /* ignore */ }
53+
}
54+
55+
// Stateless uppercase transform (shared by all paths)
56+
function uppercaseChunk(chunk) {
57+
const buf = Buffer.allocUnsafe(chunk.length);
58+
for (let i = 0; i < chunk.length; i++) {
59+
const b = chunk[i];
60+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
61+
}
62+
return buf;
63+
}
64+
65+
// ---------------------------------------------------------------------------
66+
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
67+
// ---------------------------------------------------------------------------
68+
async function benchClassic(n, filesize) {
69+
await runClassic();
70+
71+
bench.start();
72+
let totalBytes = 0;
73+
for (let i = 0; i < n; i++) {
74+
totalBytes += await runClassic();
75+
}
76+
bench.end(totalBytes / (1024 * 1024));
77+
}
78+
79+
function runClassic() {
80+
return new Promise((resolve, reject) => {
81+
const rs = fs.createReadStream(filename);
82+
83+
const upper = new Transform({
84+
transform(chunk, encoding, callback) {
85+
callback(null, uppercaseChunk(chunk));
86+
},
87+
});
88+
89+
const gz = zlib.createGzip();
90+
91+
let totalBytes = 0;
92+
const sink = new Writable({
93+
write(chunk, encoding, callback) {
94+
totalBytes += chunk.length;
95+
callback();
96+
},
97+
});
98+
99+
pipeline(rs, upper, gz, sink, (err) => {
100+
if (err) reject(err);
101+
else resolve(totalBytes);
102+
});
103+
});
104+
}
105+
106+
// ---------------------------------------------------------------------------
107+
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
108+
// ---------------------------------------------------------------------------
109+
async function benchWebStream(n, filesize) {
110+
await runWebStream();
111+
112+
bench.start();
113+
let totalBytes = 0;
114+
for (let i = 0; i < n; i++) {
115+
totalBytes += await runWebStream();
116+
}
117+
bench.end(totalBytes / (1024 * 1024));
118+
}
119+
120+
async function runWebStream() {
121+
const fh = await fs.promises.open(filename, 'r');
122+
try {
123+
const rs = fh.readableWebStream();
124+
125+
const upper = new TransformStream({
126+
transform(chunk, controller) {
127+
const buf = new Uint8Array(chunk.length);
128+
for (let i = 0; i < chunk.length; i++) {
129+
const b = chunk[i];
130+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
131+
}
132+
controller.enqueue(buf);
133+
},
134+
});
135+
136+
const compress = new CompressionStream('gzip');
137+
const output = rs.pipeThrough(upper).pipeThrough(compress);
138+
const reader = output.getReader();
139+
140+
let totalBytes = 0;
141+
while (true) {
142+
const { done, value } = await reader.read();
143+
if (done) break;
144+
totalBytes += value.byteLength;
145+
}
146+
return totalBytes;
147+
} finally {
148+
await fh.close();
149+
}
150+
}
151+
152+
// ---------------------------------------------------------------------------
153+
// Pull/iter path: pull() with uppercase transform + selected compression
154+
// ---------------------------------------------------------------------------
155+
async function benchPull(n, filesize, compression) {
156+
const iter = require('zlib/iter');
157+
158+
const compressFactory = {
159+
gzip: iter.compressGzip,
160+
deflate: iter.compressDeflate,
161+
brotli: iter.compressBrotli,
162+
zstd: iter.compressZstd,
163+
}[compression];
164+
165+
// Warm up
166+
await runPull(compressFactory);
167+
168+
bench.start();
169+
let totalBytes = 0;
170+
for (let i = 0; i < n; i++) {
171+
totalBytes += await runPull(compressFactory);
172+
}
173+
bench.end(totalBytes / (1024 * 1024));
174+
}
175+
176+
async function runPull(compressFactory) {
177+
const fh = await fs.promises.open(filename, 'r');
178+
try {
179+
// Stateless transform: uppercase each chunk in the batch
180+
const upper = (chunks) => {
181+
if (chunks === null) return null;
182+
const out = new Array(chunks.length);
183+
for (let j = 0; j < chunks.length; j++) {
184+
out[j] = uppercaseChunk(chunks[j]);
185+
}
186+
return out;
187+
};
188+
189+
const readable = fh.pull(upper, compressFactory());
190+
191+
let totalBytes = 0;
192+
for await (const chunks of readable) {
193+
for (let i = 0; i < chunks.length; i++) {
194+
totalBytes += chunks[i].byteLength;
195+
}
196+
}
197+
return totalBytes;
198+
} finally {
199+
await fh.close();
200+
}
201+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.