Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 5eb93f1

Browse filesBrowse files
marco-ippolitojuanarbol
authored andcommitted
doc: add stream/promises pipeline and finished to doc
PR-URL: #45832 Fixes: #45821 Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 4aaec07 commit 5eb93f1
Copy full SHA for 5eb93f1

File tree

Expand file treeCollapse file tree

1 file changed

+227
-107
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

1 file changed

+227
-107
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+227-107Lines changed: 227 additions & 107 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using
5959
callbacks. The API is accessible via `require('node:stream/promises')`
6060
or `require('node:stream').promises`.
6161

62+
### `stream.pipeline(source[, ...transforms], destination[, options])`
63+
64+
### `stream.pipeline(streams[, options])`
65+
66+
<!-- YAML
67+
added: v15.0.0
68+
-->
69+
70+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
71+
* `source` {Stream|Iterable|AsyncIterable|Function}
72+
* Returns: {Promise|AsyncIterable}
73+
* `...transforms` {Stream|Function}
74+
* `source` {AsyncIterable}
75+
* Returns: {Promise|AsyncIterable}
76+
* `destination` {Stream|Function}
77+
* `source` {AsyncIterable}
78+
* Returns: {Promise|AsyncIterable}
79+
* `options` {Object}
80+
* `signal` {AbortSignal}
81+
* `end` {boolean}
82+
* Returns: {Promise} Fulfills when the pipeline is complete.
83+
84+
```cjs
85+
const { pipeline } = require('node:stream/promises');
86+
const fs = require('node:fs');
87+
const zlib = require('node:zlib');
88+
89+
async function run() {
90+
await pipeline(
91+
fs.createReadStream('archive.tar'),
92+
zlib.createGzip(),
93+
fs.createWriteStream('archive.tar.gz'),
94+
);
95+
console.log('Pipeline succeeded.');
96+
}
97+
98+
run().catch(console.error);
99+
```
100+
101+
```mjs
102+
import { pipeline } from 'node:stream/promises';
103+
import { createReadStream, createWriteStream } from 'node:fs';
104+
import { createGzip } from 'node:zlib';
105+
106+
await pipeline(
107+
createReadStream('archive.tar'),
108+
createGzip(),
109+
createWriteStream('archive.tar.gz'),
110+
);
111+
console.log('Pipeline succeeded.');
112+
```
113+
114+
To use an `AbortSignal`, pass it inside an options object, as the last argument.
115+
When the signal is aborted, `destroy` will be called on the underlying pipeline,
116+
with an `AbortError`.
117+
118+
```cjs
119+
const { pipeline } = require('node:stream/promises');
120+
const fs = require('node:fs');
121+
const zlib = require('node:zlib');
122+
123+
async function run() {
124+
const ac = new AbortController();
125+
const signal = ac.signal;
126+
127+
setImmediate(() => ac.abort());
128+
await pipeline(
129+
fs.createReadStream('archive.tar'),
130+
zlib.createGzip(),
131+
fs.createWriteStream('archive.tar.gz'),
132+
{ signal },
133+
);
134+
}
135+
136+
run().catch(console.error); // AbortError
137+
```
138+
139+
```mjs
140+
import { pipeline } from 'node:stream/promises';
141+
import { createReadStream, createWriteStream } from 'node:fs';
142+
import { createGzip } from 'node:zlib';
143+
144+
const ac = new AbortController();
145+
const { signal } = ac;
146+
setImmediate(() => ac.abort());
147+
try {
148+
await pipeline(
149+
createReadStream('archive.tar'),
150+
createGzip(),
151+
createWriteStream('archive.tar.gz'),
152+
{ signal },
153+
);
154+
} catch (err) {
155+
console.error(err); // AbortError
156+
}
157+
```
158+
159+
The `pipeline` API also supports async generators:
160+
161+
```cjs
162+
const { pipeline } = require('node:stream/promises');
163+
const fs = require('node:fs');
164+
165+
async function run() {
166+
await pipeline(
167+
fs.createReadStream('lowercase.txt'),
168+
async function* (source, { signal }) {
169+
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
170+
for await (const chunk of source) {
171+
yield await processChunk(chunk, { signal });
172+
}
173+
},
174+
fs.createWriteStream('uppercase.txt'),
175+
);
176+
console.log('Pipeline succeeded.');
177+
}
178+
179+
run().catch(console.error);
180+
```
181+
182+
```mjs
183+
import { pipeline } from 'node:stream/promises';
184+
import { createReadStream, createWriteStream } from 'node:fs';
185+
186+
await pipeline(
187+
createReadStream('lowercase.txt'),
188+
async function* (source, { signal }) {
189+
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
190+
for await (const chunk of source) {
191+
yield await processChunk(chunk, { signal });
192+
}
193+
},
194+
createWriteStream('uppercase.txt'),
195+
);
196+
console.log('Pipeline succeeded.');
197+
```
198+
199+
Remember to handle the `signal` argument passed into the async generator.
200+
Especially in the case where the async generator is the source for the
201+
pipeline (i.e. first argument) or the pipeline will never complete.
202+
203+
```cjs
204+
const { pipeline } = require('node:stream/promises');
205+
const fs = require('node:fs');
206+
207+
async function run() {
208+
await pipeline(
209+
async function* ({ signal }) {
210+
await someLongRunningfn({ signal });
211+
yield 'asd';
212+
},
213+
fs.createWriteStream('uppercase.txt'),
214+
);
215+
console.log('Pipeline succeeded.');
216+
}
217+
218+
run().catch(console.error);
219+
```
220+
221+
```mjs
222+
import { pipeline } from 'node:stream/promises';
223+
import fs from 'node:fs';
224+
await pipeline(
225+
async function* ({ signal }) {
226+
await someLongRunningfn({ signal });
227+
yield 'asd';
228+
},
229+
fs.createWriteStream('uppercase.txt'),
230+
);
231+
console.log('Pipeline succeeded.');
232+
```
233+
234+
The `pipeline` API provides [callback version][stream-pipeline]:
235+
236+
### `stream.finished(stream[, options])`
237+
238+
<!-- YAML
239+
added: v15.0.0
240+
-->
241+
242+
* `stream` {Stream}
243+
* `options` {Object}
244+
* `error` {boolean|undefined}
245+
* `readable` {boolean|undefined}
246+
* `writable` {boolean|undefined}
247+
* `signal`: {AbortSignal|undefined}
248+
* Returns: {Promise} Fulfills when the stream is no
249+
longer readable or writable.
250+
251+
```cjs
252+
const { finished } = require('node:stream/promises');
253+
const fs = require('node:fs');
254+
255+
const rs = fs.createReadStream('archive.tar');
256+
257+
async function run() {
258+
await finished(rs);
259+
console.log('Stream is done reading.');
260+
}
261+
262+
run().catch(console.error);
263+
rs.resume(); // Drain the stream.
264+
```
265+
266+
```mjs
267+
import { finished } from 'node:stream/promises';
268+
import { createReadStream } from 'node:fs';
269+
270+
const rs = createReadStream('archive.tar');
271+
272+
async function run() {
273+
await finished(rs);
274+
console.log('Stream is done reading.');
275+
}
276+
277+
run().catch(console.error);
278+
rs.resume(); // Drain the stream.
279+
```
280+
281+
The `finished` API provides [callback version][stream-finished]:
282+
62283
### Object mode
63284

64285
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -2425,22 +2646,7 @@ Especially useful in error handling scenarios where a stream is destroyed
24252646
prematurely (like an aborted HTTP request), and will not emit `'end'`
24262647
or `'finish'`.
24272648

2428-
The `finished` API provides promise version:
2429-
2430-
```js
2431-
const { finished } = require('node:stream/promises');
2432-
const fs = require('node:fs');
2433-
2434-
const rs = fs.createReadStream('archive.tar');
2435-
2436-
async function run() {
2437-
await finished(rs);
2438-
console.log('Stream is done reading.');
2439-
}
2440-
2441-
run().catch(console.error);
2442-
rs.resume(); // Drain the stream.
2443-
```
2649+
The `finished` API provides [promise version][stream-finished-promise].
24442650

24452651
`stream.finished()` leaves dangling event listeners (in particular
24462652
`'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been
@@ -2520,97 +2726,7 @@ pipeline(
25202726
);
25212727
```
25222728

2523-
The `pipeline` API provides a promise version, which can also
2524-
receive an options argument as the last parameter with a
2525-
`signal` {AbortSignal} property. When the signal is aborted,
2526-
`destroy` will be called on the underlying pipeline, with an
2527-
`AbortError`.
2528-
2529-
```js
2530-
const { pipeline } = require('node:stream/promises');
2531-
const fs = require('node:fs');
2532-
const zlib = require('node:zlib');
2533-
2534-
async function run() {
2535-
await pipeline(
2536-
fs.createReadStream('archive.tar'),
2537-
zlib.createGzip(),
2538-
fs.createWriteStream('archive.tar.gz'),
2539-
);
2540-
console.log('Pipeline succeeded.');
2541-
}
2542-
2543-
run().catch(console.error);
2544-
```
2545-
2546-
To use an `AbortSignal`, pass it inside an options object,
2547-
as the last argument:
2548-
2549-
```js
2550-
const { pipeline } = require('node:stream/promises');
2551-
const fs = require('node:fs');
2552-
const zlib = require('node:zlib');
2553-
2554-
async function run() {
2555-
const ac = new AbortController();
2556-
const signal = ac.signal;
2557-
2558-
setTimeout(() => ac.abort(), 1);
2559-
await pipeline(
2560-
fs.createReadStream('archive.tar'),
2561-
zlib.createGzip(),
2562-
fs.createWriteStream('archive.tar.gz'),
2563-
{ signal },
2564-
);
2565-
}
2566-
2567-
run().catch(console.error); // AbortError
2568-
```
2569-
2570-
The `pipeline` API also supports async generators:
2571-
2572-
```js
2573-
const { pipeline } = require('node:stream/promises');
2574-
const fs = require('node:fs');
2575-
2576-
async function run() {
2577-
await pipeline(
2578-
fs.createReadStream('lowercase.txt'),
2579-
async function* (source, { signal }) {
2580-
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
2581-
for await (const chunk of source) {
2582-
yield await processChunk(chunk, { signal });
2583-
}
2584-
},
2585-
fs.createWriteStream('uppercase.txt'),
2586-
);
2587-
console.log('Pipeline succeeded.');
2588-
}
2589-
2590-
run().catch(console.error);
2591-
```
2592-
2593-
Remember to handle the `signal` argument passed into the async generator.
2594-
Especially in the case where the async generator is the source for the
2595-
pipeline (i.e. first argument) or the pipeline will never complete.
2596-
2597-
```js
2598-
const { pipeline } = require('node:stream/promises');
2599-
const fs = require('node:fs');
2600-
2601-
async function run() {
2602-
await pipeline(
2603-
async function* ({ signal }) {
2604-
await someLongRunningfn({ signal });
2605-
yield 'asd';
2606-
},
2607-
fs.createWriteStream('uppercase.txt'),
2608-
);
2609-
console.log('Pipeline succeeded.');
2610-
}
2611-
2612-
run().catch(console.error);
2613-
```
2729+
The `pipeline` API provides a [promise version][stream-pipeline-promise].
26142730

26152731
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
26162732

@@ -4544,7 +4660,11 @@ contain multi-byte characters.
45444660
[stream-_write]: #writable_writechunk-encoding-callback
45454661
[stream-_writev]: #writable_writevchunks-callback
45464662
[stream-end]: #writableendchunk-encoding-callback
4663+
[stream-finished]: #streamfinishedstream-options-callback
4664+
[stream-finished-promise]: #streamfinishedstream-options
45474665
[stream-pause]: #readablepause
4666+
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
4667+
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
45484668
[stream-push]: #readablepushchunk-encoding
45494669
[stream-read]: #readablereadsize
45504670
[stream-resume]: #readableresume

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.