Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d7317f4

Browse filesBrowse files
jasnelladuh95
authored andcommitted
stream: add stream/iter to classic stream adapters
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6 PR-URL: #62469 Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 9a3ac66 commit d7317f4
Copy full SHA for d7317f4

17 files changed

+3,848-14Lines changed: 3848 additions & 14 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎doc/api/errors.md‎

Copy file name to clipboardExpand all lines: doc/api/errors.md
+7Lines changed: 7 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
28822882
A stream method was called that cannot complete because the stream was
28832883
destroyed using `stream.destroy()`.
28842884

2885+
<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>
2886+
2887+
### `ERR_STREAM_ITER_MISSING_FLAG`
2888+
2889+
A stream/iter API was used without the `--experimental-stream-iter` CLI flag
2890+
enabled.
2891+
28852892
<a id="ERR_STREAM_NULL_VALUES"></a>
28862893

28872894
### `ERR_STREAM_NULL_VALUES`
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+59Lines changed: 59 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file
19981998
has less then 64 KiB of data because no `highWaterMark` option is provided to
19991999
[`fs.createReadStream()`][].
20002000

2001+
##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`
2002+
2003+
<!-- YAML
2004+
added: REPLACEME
2005+
-->
2006+
2007+
> Stability: 1 - Experimental
2008+
2009+
* Returns: {AsyncIterable} An `AsyncIterable<Uint8Array[]>` that yields
2010+
batched chunks from the stream.
2011+
2012+
When the `--experimental-stream-iter` flag is enabled, `Readable` streams
2013+
implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
2014+
consumption by the [`stream/iter`][] API.
2015+
2016+
This provides a batched async iterator that drains the stream's internal
2017+
buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
2018+
of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
2019+
are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
2020+
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
2021+
before batching.
2022+
2023+
The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from]
2024+
passes it through without additional normalization.
2025+
2026+
```mjs
2027+
import { Readable } from 'node:stream';
2028+
import { text, from } from 'node:stream/iter';
2029+
2030+
const readable = new Readable({
2031+
read() { this.push('hello'); this.push(null); },
2032+
});
2033+
2034+
// Readable is automatically consumed via toAsyncStreamable
2035+
console.log(await text(from(readable))); // 'hello'
2036+
```
2037+
2038+
```cjs
2039+
const { Readable } = require('node:stream');
2040+
const { text, from } = require('node:stream/iter');
2041+
2042+
async function run() {
2043+
const readable = new Readable({
2044+
read() { this.push('hello'); this.push(null); },
2045+
});
2046+
2047+
console.log(await text(from(readable))); // 'hello'
2048+
}
2049+
2050+
run().catch(console.error);
2051+
```
2052+
2053+
Without the `--experimental-stream-iter` flag, calling this method throws
2054+
[`ERR_STREAM_ITER_MISSING_FLAG`][].
2055+
20012056
##### `readable[Symbol.asyncDispose]()`
20022057

20032058
<!-- YAML
@@ -4997,8 +5052,10 @@ contain multi-byte characters.
49975052
[`'finish'`]: #event-finish
49985053
[`'readable'`]: #event-readable
49995054
[`Duplex`]: #class-streamduplex
5055+
[`ERR_STREAM_ITER_MISSING_FLAG`]: errors.md#err_stream_iter_missing_flag
50005056
[`EventEmitter`]: events.md#class-eventemitter
50015057
[`Readable`]: #class-streamreadable
5058+
[`Stream.toAsyncStreamable`]: stream_iter.md#streamtoasyncstreamable
50025059
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
50035060
[`Transform`]: #class-streamtransform
50045061
[`Writable`]: #class-streamwritable
@@ -5024,6 +5081,7 @@ contain multi-byte characters.
50245081
[`stream.uncork()`]: #writableuncork
50255082
[`stream.unpipe()`]: #readableunpipedestination
50265083
[`stream.wrap()`]: #readablewrapstream
5084+
[`stream/iter`]: stream_iter.md
50275085
[`writable._final()`]: #writable_finalcallback
50285086
[`writable._write()`]: #writable_writechunk-encoding-callback
50295087
[`writable._writev()`]: #writable_writevchunks-callback
@@ -5052,6 +5110,7 @@ contain multi-byte characters.
50525110
[stream-end]: #writableendchunk-encoding-callback
50535111
[stream-finished]: #streamfinishedstream-options-callback
50545112
[stream-finished-promise]: #streamfinishedstream-options
5113+
[stream-iter-from]: stream_iter.md#frominput
50555114
[stream-pause]: #readablepause
50565115
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
50575116
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
Collapse file

‎doc/api/stream_iter.md‎

Copy file name to clipboardExpand all lines: doc/api/stream_iter.md
+257Lines changed: 257 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,258 @@ Compression and decompression transforms for use with `pull()`, `pullSync()`,
14241424
`pipeTo()`, and `pipeToSync()` are available via the [`node:zlib/iter`][]
14251425
module. See the [`node:zlib/iter` documentation][] for details.
14261426

1427+
## Classic stream interop
1428+
1429+
These utility functions bridge between classic
1430+
[`stream.Readable`][]/[`stream.Writable`][] streams and the `stream/iter`
1431+
API.
1432+
1433+
Both `fromReadable()` and `fromWritable()` accept duck-typed objects -- they
1434+
do not require the input to extend `stream.Readable` or `stream.Writable`
1435+
directly. The minimum contract is described below for each function.
1436+
1437+
### `fromReadable(readable)`
1438+
1439+
<!-- YAML
1440+
added: REPLACEME
1441+
-->
1442+
1443+
> Stability: 1 - Experimental
1444+
1445+
* `readable` {stream.Readable|Object} A classic Readable stream or any object
1446+
with `read()` and `on()` methods.
1447+
* Returns: {AsyncIterable\<Uint8Array\[]>} A stream/iter async iterable source.
1448+
1449+
Converts a classic Readable stream (or duck-typed equivalent) into a
1450+
stream/iter async iterable source that can be passed to [`from()`][],
1451+
[`pull()`][], [`text()`][], etc.
1452+
1453+
If the object implements the [`toAsyncStreamable`][] protocol (as
1454+
`stream.Readable` does), that protocol is used. Otherwise, the function
1455+
duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with
1456+
a batched async iterator.
1457+
1458+
The result is cached per instance -- calling `fromReadable()` twice with the
1459+
same stream returns the same iterable.
1460+
1461+
For object-mode or encoded Readable streams, chunks are automatically
1462+
normalized to `Uint8Array`.
1463+
1464+
```mjs
1465+
import { Readable } from 'node:stream';
1466+
import { fromReadable, text } from 'node:stream/iter';
1467+
1468+
const readable = new Readable({
1469+
read() { this.push('hello world'); this.push(null); },
1470+
});
1471+
1472+
const result = await text(fromReadable(readable));
1473+
console.log(result); // 'hello world'
1474+
```
1475+
1476+
```cjs
1477+
const { Readable } = require('node:stream');
1478+
const { fromReadable, text } = require('node:stream/iter');
1479+
1480+
const readable = new Readable({
1481+
read() { this.push('hello world'); this.push(null); },
1482+
});
1483+
1484+
async function run() {
1485+
const result = await text(fromReadable(readable));
1486+
console.log(result); // 'hello world'
1487+
}
1488+
run();
1489+
```
1490+
1491+
### `fromWritable(writable[, options])`
1492+
1493+
<!-- YAML
1494+
added: REPLACEME
1495+
-->
1496+
1497+
> Stability: 1 - Experimental
1498+
1499+
* `writable` {stream.Writable|Object} A classic Writable stream or any object
1500+
with `write()` and `on()` methods.
1501+
* `options` {Object}
1502+
* `backpressure` {string} Backpressure policy. **Default:** `'strict'`.
1503+
* `'strict'` -- writes are rejected when the buffer is full. Catches
1504+
callers that ignore backpressure.
1505+
* `'block'` -- writes wait for drain when the buffer is full. Recommended
1506+
for use with [`pipeTo()`][].
1507+
* `'drop-newest'` -- writes are silently discarded when the buffer is full.
1508+
* `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`.
1509+
* Returns: {Object} A stream/iter Writer adapter.
1510+
1511+
Creates a stream/iter Writer adapter from a classic Writable stream (or
1512+
duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a
1513+
destination.
1514+
1515+
Since all writes on a classic Writable are fundamentally asynchronous,
1516+
the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
1517+
return `false` or `-1`, deferring to the async path. The per-write
1518+
`options.signal` parameter from the Writer interface is also ignored.
1519+
1520+
The result is cached per instance -- calling `fromWritable()` twice with the
1521+
same stream returns the same Writer.
1522+
1523+
For duck-typed streams that do not expose `writableHighWaterMark`,
1524+
`writableLength`, or similar properties, sensible defaults are used.
1525+
Object-mode writables (if detectable) are rejected since the Writer
1526+
interface is bytes-only.
1527+
1528+
```mjs
1529+
import { Writable } from 'node:stream';
1530+
import { from, fromWritable, pipeTo } from 'node:stream/iter';
1531+
1532+
const writable = new Writable({
1533+
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
1534+
});
1535+
1536+
await pipeTo(from('hello world'),
1537+
fromWritable(writable, { backpressure: 'block' }));
1538+
```
1539+
1540+
```cjs
1541+
const { Writable } = require('node:stream');
1542+
const { from, fromWritable, pipeTo } = require('node:stream/iter');
1543+
1544+
async function run() {
1545+
const writable = new Writable({
1546+
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
1547+
});
1548+
1549+
await pipeTo(from('hello world'),
1550+
fromWritable(writable, { backpressure: 'block' }));
1551+
}
1552+
run();
1553+
```
1554+
1555+
### `toReadable(source[, options])`
1556+
1557+
<!-- YAML
1558+
added: REPLACEME
1559+
-->
1560+
1561+
> Stability: 1 - Experimental
1562+
1563+
* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
1564+
the return value of [`pull()`][] or [`from()`][].
1565+
* `options` {Object}
1566+
* `highWaterMark` {number} The internal buffer size in bytes before
1567+
backpressure is applied. **Default:** `65536` (64 KB).
1568+
* `signal` {AbortSignal} An optional signal to abort the readable.
1569+
* Returns: {stream.Readable}
1570+
1571+
Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable<Uint8Array[]>`
1572+
(the native batch format used by the stream/iter API). Each `Uint8Array` in a
1573+
yielded batch is pushed as a separate chunk into the Readable.
1574+
1575+
```mjs
1576+
import { createWriteStream } from 'node:fs';
1577+
import { from, pull, toReadable } from 'node:stream/iter';
1578+
import { compressGzip } from 'node:zlib/iter';
1579+
1580+
const source = pull(from('hello world'), compressGzip());
1581+
const readable = toReadable(source);
1582+
1583+
readable.pipe(createWriteStream('output.gz'));
1584+
```
1585+
1586+
```cjs
1587+
const { createWriteStream } = require('node:fs');
1588+
const { from, pull, toReadable } = require('node:stream/iter');
1589+
const { compressGzip } = require('node:zlib/iter');
1590+
1591+
const source = pull(from('hello world'), compressGzip());
1592+
const readable = toReadable(source);
1593+
1594+
readable.pipe(createWriteStream('output.gz'));
1595+
```
1596+
1597+
### `toReadableSync(source[, options])`
1598+
1599+
<!-- YAML
1600+
added: REPLACEME
1601+
-->
1602+
1603+
> Stability: 1 - Experimental
1604+
1605+
* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
1606+
return value of [`pullSync()`][] or [`fromSync()`][].
1607+
* `options` {Object}
1608+
* `highWaterMark` {number} The internal buffer size in bytes before
1609+
backpressure is applied. **Default:** `65536` (64 KB).
1610+
* Returns: {stream.Readable}
1611+
1612+
Creates a byte-mode [`stream.Readable`][] from a synchronous
1613+
`Iterable<Uint8Array[]>`. The `_read()` method pulls from the iterator
1614+
synchronously, so data is available immediately via `readable.read()`.
1615+
1616+
```mjs
1617+
import { fromSync, toReadableSync } from 'node:stream/iter';
1618+
1619+
const source = fromSync('hello world');
1620+
const readable = toReadableSync(source);
1621+
1622+
console.log(readable.read().toString()); // 'hello world'
1623+
```
1624+
1625+
```cjs
1626+
const { fromSync, toReadableSync } = require('node:stream/iter');
1627+
1628+
const source = fromSync('hello world');
1629+
const readable = toReadableSync(source);
1630+
1631+
console.log(readable.read().toString()); // 'hello world'
1632+
```
1633+
1634+
### `toWritable(writer)`
1635+
1636+
<!-- YAML
1637+
added: REPLACEME
1638+
-->
1639+
1640+
> Stability: 1 - Experimental
1641+
1642+
* `writer` {Object} A stream/iter Writer. Only the `write()` method is
1643+
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
1644+
and `writev()` are optional.
1645+
* Returns: {stream.Writable}
1646+
1647+
Creates a classic [`stream.Writable`][] backed by a stream/iter Writer.
1648+
1649+
Each `_write()` / `_writev()` call attempts the Writer's synchronous method
1650+
first (`writeSync` / `writevSync`), falling back to the async method if the
1651+
sync path returns `false` or throws. Similarly, `_final()` tries `endSync()`
1652+
before `end()`. When the sync path succeeds, the callback is deferred via
1653+
`queueMicrotask` to preserve the async resolution contract.
1654+
1655+
The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to
1656+
effectively disable its internal buffering, allowing the underlying Writer
1657+
to manage backpressure directly.
1658+
1659+
```mjs
1660+
import { push, toWritable } from 'node:stream/iter';
1661+
1662+
const { writer, readable } = push();
1663+
const writable = toWritable(writer);
1664+
1665+
writable.write('hello');
1666+
writable.end();
1667+
```
1668+
1669+
```cjs
1670+
const { push, toWritable } = require('node:stream/iter');
1671+
1672+
const { writer, readable } = push();
1673+
const writable = toWritable(writer);
1674+
1675+
writable.write('hello');
1676+
writable.end();
1677+
```
1678+
14271679
## Protocol symbols
14281680

14291681
These well-known symbols allow third-party objects to participate in the
@@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world'
18162068
[`arrayBuffer()`]: #arraybuffersource-options
18172069
[`bytes()`]: #bytessource-options
18182070
[`from()`]: #frominput
2071+
[`fromSync()`]: #fromsyncinput
18192072
[`node:zlib/iter`]: zlib_iter.md
18202073
[`node:zlib/iter` documentation]: zlib_iter.md
18212074
[`pipeTo()`]: #pipetosource-transforms-writer-options
18222075
[`pull()`]: #pullsource-transforms-options
2076+
[`pullSync()`]: #pullsyncsource-transforms-options
18232077
[`share()`]: #sharesource-options
2078+
[`stream.Readable`]: stream.md#class-streamreadable
2079+
[`stream.Writable`]: stream.md#class-streamwritable
18242080
[`tap()`]: #tapcallback
18252081
[`text()`]: #textsource-options
2082+
[`toAsyncStreamable`]: #streamtoasyncstreamable
Collapse file

‎lib/internal/errors.js‎

Copy file name to clipboardExpand all lines: lib/internal/errors.js
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
17701770
Error);
17711771
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
17721772
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
1773+
E('ERR_STREAM_ITER_MISSING_FLAG',
1774+
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
17731775
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
17741776
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
17751777
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.