Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 569767e

Browse filesBrowse files
mcollinacramforce
authored andcommitted
stream: add fast paths for webstreams read and pipeTo
Add internal fast paths to improve webstreams performance without changing the public API or breaking spec compliance. 1. ReadableStreamDefaultReader.read() fast path: When data is already buffered in the controller's queue, return PromiseResolve() directly without creating a DefaultReadRequest object. This is spec-compliant because read() returns a Promise, and resolved promises still run callbacks in the microtask queue. 2. pipeTo() batch read fast path: When data is buffered, batch reads directly from the controller queue up to highWaterMark without creating PipeToReadableStreamReadRequest objects per chunk. Respects backpressure by checking desiredSize after each write. Benchmark results: - pipeTo: ~11% faster (***) - buffered read(): ~17-20% faster (***) Co-Authored-By: Malte Ubl <malte@vercel.com> PR-URL: #61807 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gürgün Dayıoğlu <hey@gurgun.day> Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
1 parent 240b512 commit 569767e
Copy full SHA for 569767e

2 files changed

+132Lines changed: 132 additions & 0 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const { ReadableStream } = require('node:stream/web');
4+
5+
// Benchmark for reading from a pre-buffered ReadableStream.
6+
// This measures the fast path optimization where data is already
7+
// queued in the controller, avoiding DefaultReadRequest allocation.
8+
9+
const bench = common.createBenchmark(main, {
10+
n: [1e5],
11+
bufferSize: [1, 10, 100, 1000],
12+
});
13+
14+
async function main({ n, bufferSize }) {
15+
let enqueued = 0;
16+
17+
const rs = new ReadableStream({
18+
start(controller) {
19+
// Pre-fill the buffer
20+
for (let i = 0; i < bufferSize; i++) {
21+
controller.enqueue('a');
22+
enqueued++;
23+
}
24+
},
25+
pull(controller) {
26+
// Refill buffer when pulled
27+
const toEnqueue = Math.min(bufferSize, n - enqueued);
28+
for (let i = 0; i < toEnqueue; i++) {
29+
controller.enqueue('a');
30+
enqueued++;
31+
}
32+
if (enqueued >= n) {
33+
controller.close();
34+
}
35+
},
36+
}, {
37+
// Use buffer size as high water mark to allow pre-buffering
38+
highWaterMark: bufferSize,
39+
});
40+
41+
const reader = rs.getReader();
42+
let x = null;
43+
let reads = 0;
44+
45+
bench.start();
46+
while (reads < n) {
47+
const { value, done } = await reader.read();
48+
if (done) break;
49+
x = value;
50+
reads++;
51+
}
52+
bench.end(reads);
53+
console.assert(x);
54+
}
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+78Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,31 @@ class ReadableStreamDefaultReader {
860860
new ERR_INVALID_STATE.TypeError(
861861
'The reader is not attached to a stream'));
862862
}
863+
864+
const stream = this[kState].stream;
865+
const controller = stream[kState].controller;
866+
867+
// Fast path: if data is already buffered in a default controller,
868+
// return a resolved promise immediately without creating a read request.
869+
// This is spec-compliant because read() returns a Promise, and
870+
// Promise.resolve() callbacks still run in the microtask queue.
871+
if (stream[kState].state === 'readable' &&
872+
isReadableStreamDefaultController(controller) &&
873+
controller[kState].queue.length > 0) {
874+
stream[kState].disturbed = true;
875+
const chunk = dequeueValue(controller);
876+
877+
if (controller[kState].closeRequested && !controller[kState].queue.length) {
878+
readableStreamDefaultControllerClearAlgorithms(controller);
879+
readableStreamClose(stream);
880+
} else {
881+
readableStreamDefaultControllerCallPullIfNeeded(controller);
882+
}
883+
884+
return PromiseResolve({ value: chunk, done: false });
885+
}
886+
887+
// Slow path: create request and go through normal flow
863888
const readRequest = new DefaultReadRequest();
864889
readableStreamDefaultReaderRead(this, readRequest);
865890
return readRequest.promise;
@@ -1286,6 +1311,8 @@ const isReadableStream =
12861311
isBrandCheck('ReadableStream');
12871312
const isReadableByteStreamController =
12881313
isBrandCheck('ReadableByteStreamController');
1314+
const isReadableStreamDefaultController =
1315+
isBrandCheck('ReadableStreamDefaultController');
12891316
const isReadableStreamBYOBRequest =
12901317
isBrandCheck('ReadableStreamBYOBRequest');
12911318
const isReadableStreamDefaultReader =
@@ -1510,6 +1537,57 @@ function readableStreamPipeTo(
15101537

15111538
await writer[kState].ready.promise;
15121539

1540+
const controller = source[kState].controller;
1541+
1542+
// Fast path: batch reads when data is buffered in a default controller.
1543+
// This avoids creating PipeToReadableStreamReadRequest objects and
1544+
// reduces promise allocation overhead.
1545+
if (source[kState].state === 'readable' &&
1546+
isReadableStreamDefaultController(controller) &&
1547+
controller[kState].queue.length > 0) {
1548+
1549+
while (controller[kState].queue.length > 0) {
1550+
if (shuttingDown) return true;
1551+
1552+
source[kState].disturbed = true;
1553+
const chunk = dequeueValue(controller);
1554+
1555+
if (controller[kState].closeRequested && !controller[kState].queue.length) {
1556+
readableStreamDefaultControllerClearAlgorithms(controller);
1557+
readableStreamClose(source);
1558+
}
1559+
1560+
// Write the chunk - we're already in a separate microtask from enqueue
1561+
// because we awaited writer[kState].ready.promise above
1562+
state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
1563+
setPromiseHandled(state.currentWrite);
1564+
1565+
// Check backpressure after each write
1566+
if (dest[kState].state === 'writable') {
1567+
const desiredSize = writer.desiredSize;
1568+
if (desiredSize !== null && desiredSize <= 0) {
1569+
// Backpressure - stop batch and wait for ready
1570+
break;
1571+
}
1572+
}
1573+
}
1574+
1575+
// Trigger pull if needed after batch
1576+
if (source[kState].state === 'readable' &&
1577+
!controller[kState].closeRequested) {
1578+
readableStreamDefaultControllerCallPullIfNeeded(controller);
1579+
}
1580+
1581+
// Check if stream closed during batch
1582+
if (source[kState].state === 'closed') {
1583+
return true;
1584+
}
1585+
1586+
// Yield to microtask queue between batches to allow events/signals to fire
1587+
return false;
1588+
}
1589+
1590+
// Slow path: use read request for async reads
15131591
const promise = PromiseWithResolvers();
15141592
// eslint-disable-next-line no-use-before-define
15151593
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.