Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fadb214

Browse filesBrowse files
authored
stream: readable read one buffer at a time
Instead of wasting cycles concatenating buffers, just return each one by one. Similar (but not exact) old behavior can be achieved by using `readable.read(readable.readableLength)` instead of `readable.read()`. In some edge cases it might be necessary to do a `readable.read(0)` first. PR: #60441 PR-URL: #60441 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Gürgün Dayıoğlu <hey@gurgun.day> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
1 parent a5ae2b2 commit fadb214
Copy full SHA for fadb214

17 files changed

+82-47Lines changed: 82 additions & 47 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎lib/internal/streams/readable.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/readable.js
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,9 +636,14 @@ function howMuchToRead(n, state) {
636636
if ((state[kState] & kObjectMode) !== 0)
637637
return 1;
638638
if (NumberIsNaN(n)) {
639+
// Fast path for buffers.
640+
if ((state[kState] & kDecoder) === 0 && state.length)
641+
return state.buffer[state.bufferIndex].length;
642+
639643
// Only flow one buffer at a time.
640644
if ((state[kState] & kFlowing) !== 0 && state.length)
641645
return state.buffer[state.bufferIndex].length;
646+
642647
return state.length;
643648
}
644649
if (n <= state.length)
Collapse file

‎test/common/heap.js‎

Copy file name to clipboardExpand all lines: test/common/heap.js
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ const { getHeapSnapshot } = require('v8');
2121

2222
function createJSHeapSnapshot(stream = getHeapSnapshot()) {
2323
stream.pause();
24-
const dump = JSON.parse(stream.read());
24+
stream.read(0);
25+
const dump = JSON.parse(stream.read(stream.readableLength));
2526
const meta = dump.snapshot.meta;
2627

2728
const nodes =
Collapse file

‎test/parallel/test-crypto-cipheriv-decipheriv.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-crypto-cipheriv-decipheriv.js
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ function testCipher1(key, iv) {
3131
// quite small, so there's no harm.
3232
const cStream = crypto.createCipheriv('des-ede3-cbc', key, iv);
3333
cStream.end(plaintext);
34-
ciph = cStream.read();
34+
ciph = cStream.read(cStream.readableLength);
3535

3636
const dStream = crypto.createDecipheriv('des-ede3-cbc', key, iv);
3737
dStream.end(ciph);
38-
txt = dStream.read().toString('utf8');
38+
txt = dStream.read(dStream.readableLength).toString('utf8');
3939

4040
assert.strictEqual(txt, plaintext,
4141
`streaming cipher with key ${key} and iv ${iv}`);
Collapse file

‎test/parallel/test-runner-run.mjs‎

Copy file name to clipboardExpand all lines: test/parallel/test-runner-run.mjs
+16-18Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as fixtures from '../common/fixtures.mjs';
33
import { join } from 'node:path';
44
import { describe, it, run } from 'node:test';
55
import { dot, spec, tap } from 'node:test/reporters';
6+
import consumers from 'node:stream/consumers';
67
import assert from 'node:assert';
78
import util from 'node:util';
89

@@ -111,34 +112,31 @@ describe('require(\'node:test\').run', { concurrency: true }, () => {
111112
describe('should be piped with spec reporter', () => {
112113
it('new spec', async () => {
113114
const specReporter = new spec();
114-
const result = await run({
115+
const result = await consumers.text(run({
115116
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
116-
}).compose(specReporter).toArray();
117-
const stringResults = result.map((bfr) => bfr.toString());
118-
assert.match(stringResults[0], /this should pass/);
119-
assert.match(stringResults[1], /tests 1/);
120-
assert.match(stringResults[1], /pass 1/);
117+
}).compose(specReporter));
118+
assert.match(result, /this should pass/);
119+
assert.match(result, /tests 1/);
120+
assert.match(result, /pass 1/);
121121
});
122122

123123
it('spec()', async () => {
124124
const specReporter = spec();
125-
const result = await run({
125+
const result = await consumers.text(run({
126126
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
127-
}).compose(specReporter).toArray();
128-
const stringResults = result.map((bfr) => bfr.toString());
129-
assert.match(stringResults[0], /this should pass/);
130-
assert.match(stringResults[1], /tests 1/);
131-
assert.match(stringResults[1], /pass 1/);
127+
}).compose(specReporter));
128+
assert.match(result, /this should pass/);
129+
assert.match(result, /tests 1/);
130+
assert.match(result, /pass 1/);
132131
});
133132

134133
it('spec', async () => {
135-
const result = await run({
134+
const result = await consumers.text(run({
136135
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
137-
}).compose(spec).toArray();
138-
const stringResults = result.map((bfr) => bfr.toString());
139-
assert.match(stringResults[0], /this should pass/);
140-
assert.match(stringResults[1], /tests 1/);
141-
assert.match(stringResults[1], /pass 1/);
136+
}).compose(spec));
137+
assert.match(result, /this should pass/);
138+
assert.match(result, /tests 1/);
139+
assert.match(result, /pass 1/);
142140
});
143141
});
144142

Collapse file

‎test/parallel/test-stream-compose.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-compose.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ const assert = require('assert');
490490

491491
newStream.end();
492492

493-
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
493+
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve Rogers'), Buffer.from('On your left')]);
494494
})().then(common.mustCall());
495495
}
496496

Collapse file

‎test/parallel/test-stream-push-strings.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-push-strings.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ms.on('readable', function() {
5959
results.push(String(chunk));
6060
});
6161

62-
const expect = [ 'first chunksecond to last chunk', 'last chunk' ];
62+
const expect = [ 'first chunk', 'second to last chunk', 'last chunk' ];
6363
process.on('exit', function() {
6464
assert.strictEqual(ms._chunks, -1);
6565
assert.deepStrictEqual(results, expect);
Collapse file

‎test/parallel/test-stream-readable-emittedReadable.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-emittedReadable.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const readable = new Readable({
1010
// Initialized to false.
1111
assert.strictEqual(readable._readableState.emittedReadable, false);
1212

13-
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
13+
const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('quo'), null];
1414
readable.on('readable', common.mustCall(() => {
1515
// emittedReadable should be true when the readable event is emitted
1616
assert.strictEqual(readable._readableState.emittedReadable, true);
Collapse file

‎test/parallel/test-stream-readable-infinite-read.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-infinite-read.js
+3-11Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,16 @@ const readable = new Readable({
1010
highWaterMark: 16 * 1024,
1111
read: common.mustCall(function() {
1212
this.push(buf);
13-
}, 31)
13+
}, 12)
1414
});
1515

1616
let i = 0;
1717

1818
readable.on('readable', common.mustCall(function() {
1919
if (i++ === 10) {
20-
// We will just terminate now.
21-
process.removeAllListeners('readable');
20+
readable.removeAllListeners('readable');
2221
return;
2322
}
2423

25-
const data = readable.read();
26-
// TODO(mcollina): there is something odd in the highWaterMark logic
27-
// investigate.
28-
if (i === 1) {
29-
assert.strictEqual(data.length, 8192 * 2);
30-
} else {
31-
assert.strictEqual(data.length, 8192 * 3);
32-
}
24+
assert.strictEqual(readable.read().length, 8192);
3325
}, 11));
Collapse file

‎test/parallel/test-stream-readable-needReadable.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-readable-needReadable.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const asyncReadable = new Readable({
3232
});
3333

3434
asyncReadable.on('readable', common.mustCall(() => {
35-
if (asyncReadable.read() !== null) {
35+
if (asyncReadable.read(asyncReadable.readableLength) !== null) {
3636
// After each read(), the buffer is empty.
3737
// If the stream doesn't end now,
3838
// then we need to notify the reader on future changes.
Collapse file
+20Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
5+
const { Readable } = require('stream');
6+
7+
// Read one buffer at a time and don't waste cycles allocating
8+
// and copying into a new larger buffer.
9+
{
10+
const r = new Readable({
11+
read() {}
12+
});
13+
const buffers = [Buffer.allocUnsafe(5), Buffer.allocUnsafe(10)];
14+
for (const buf of buffers) {
15+
r.push(buf);
16+
}
17+
for (const buf of buffers) {
18+
assert.strictEqual(r.read(), buf);
19+
}
20+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.