Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8d3b0b7

Browse filesBrowse files
CGQAQtargos
authored andcommitted
stream: use ByteLengthQueuingStrategy when not in objectMode
Fixes: #46347 PR-URL: #48847 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 15db3ef commit 8d3b0b7
Copy full SHA for 8d3b0b7

File tree

Expand file treeCollapse file tree

2 files changed

+64
-5
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+64
-5
lines changed
Open diff view settings
Collapse file

‎lib/internal/webstreams/adapters.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/adapters.js
+2-5Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131

3232
const {
3333
CountQueuingStrategy,
34+
ByteLengthQueuingStrategy,
3435
} = require('internal/webstreams/queuingstrategies');
3536

3637
const {
@@ -452,11 +453,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
452453
return new CountQueuingStrategy({ highWaterMark });
453454
}
454455

455-
// When not running in objectMode explicitly, we just fall
456-
// back to a minimal strategy that just specifies the highWaterMark
457-
// and no size algorithm. Using a ByteLengthQueuingStrategy here
458-
// is unnecessary.
459-
return { highWaterMark };
456+
return new ByteLengthQueuingStrategy({ highWaterMark });
460457
};
461458

462459
const strategy = evaluateStrategyOrFallback(options?.strategy);
Collapse file
+62Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto) { common.skip('missing crypto'); }
4+
5+
const { Readable } = require('stream');
6+
const process = require('process');
7+
const { randomBytes } = require('crypto');
8+
const assert = require('assert');
9+
10+
// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
11+
// edit: make it cross-platform as /dev/urandom is not available on Windows
12+
{
13+
let currentMemoryUsage = process.memoryUsage().arrayBuffers;
14+
15+
// We initialize a stream, but not start consuming it
16+
const randomNodeStream = new Readable({
17+
read(size) {
18+
randomBytes(size, (err, buffer) => {
19+
if (err) {
20+
// If an error occurs, emit an 'error' event
21+
this.emit('error', err);
22+
return;
23+
}
24+
25+
// Push the random bytes to the stream
26+
this.push(buffer);
27+
});
28+
}
29+
});
30+
// after 2 seconds, it'll get converted to web stream
31+
let randomWebStream;
32+
33+
// We check memory usage every second
34+
// since it's a stream, it shouldn't be higher than the chunk size
35+
const reportMemoryUsage = () => {
36+
const { arrayBuffers } = process.memoryUsage();
37+
currentMemoryUsage = arrayBuffers;
38+
39+
assert(currentMemoryUsage <= 256 * 1024 * 1024);
40+
};
41+
setInterval(reportMemoryUsage, 1000);
42+
43+
// after 1 second we use Readable.toWeb
44+
// memory usage should stay pretty much the same since it's still a stream
45+
setTimeout(() => {
46+
randomWebStream = Readable.toWeb(randomNodeStream);
47+
}, 1000);
48+
49+
// after 2 seconds we start consuming the stream
50+
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
51+
setTimeout(async () => {
52+
// eslint-disable-next-line no-unused-vars
53+
for await (const _ of randomWebStream) {
54+
// Do nothing, just let the stream flow
55+
}
56+
}, 2000);
57+
58+
setTimeout(() => {
59+
// Test considered passed if we don't crash
60+
process.exit(0);
61+
}, 5000);
62+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.