Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 527e214

Browse filesBrowse files
rickyesronag
authored andcommitted
stream: add promises version to utility functions
PR-URL: #33991 Fixes: #33582 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 6ae1b9c commit 527e214
Copy full SHA for 527e214

File tree

Expand file treeCollapse file tree

5 files changed

+193
-8
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+193
-8
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+14-8Lines changed: 14 additions & 8 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ Additionally, this module includes the utility functions
4848
[`stream.pipeline()`][], [`stream.finished()`][] and
4949
[`stream.Readable.from()`][].
5050

51+
### Streams Promises API
52+
53+
The `stream/promises` API provides an alternative set of asynchronous utility
54+
functions for streams that return `Promise` objects rather than using
55+
callbacks. The API is accessible via `require('stream/promises')`
56+
or `require('stream').promises`.
57+
5158
### Object mode
5259

5360
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -1597,10 +1604,10 @@ Especially useful in error handling scenarios where a stream is destroyed
15971604
prematurely (like an aborted HTTP request), and will not emit `'end'`
15981605
or `'finish'`.
15991606

1600-
The `finished` API is promisify-able as well;
1607+
The `finished` API provides promise version:
16011608

16021609
```js
1603-
const finished = util.promisify(stream.finished);
1610+
const { finished } = require('stream/promises');
16041611

16051612
const rs = fs.createReadStream('archive.tar');
16061613

@@ -1684,10 +1691,10 @@ pipeline(
16841691
);
16851692
```
16861693

1687-
The `pipeline` API is promisify-able as well:
1694+
The `pipeline` API provides promise version:
16881695

16891696
```js
1690-
const pipeline = util.promisify(stream.pipeline);
1697+
const { pipeline } = require('stream/promises');
16911698

16921699
async function run() {
16931700
await pipeline(
@@ -1704,7 +1711,7 @@ run().catch(console.error);
17041711
The `pipeline` API also supports async generators:
17051712

17061713
```js
1707-
const pipeline = util.promisify(stream.pipeline);
1714+
const { pipeline } = require('stream/promises');
17081715
const fs = require('fs');
17091716

17101717
async function run() {
@@ -2927,9 +2934,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
29272934
the handling of backpressure and backpressure-related errors:
29282935

29292936
```js
2930-
const { pipeline } = require('stream');
2931-
const util = require('util');
29322937
const fs = require('fs');
2938+
const { pipeline } = require('stream');
2939+
const { pipeline: pipelinePromise } = require('stream/promises');
29332940

29342941
const writable = fs.createWriteStream('./file');
29352942

@@ -2943,7 +2950,6 @@ pipeline(iterator, writable, (err, value) => {
29432950
});
29442951

29452952
// Promise Pattern
2946-
const pipelinePromise = util.promisify(pipeline);
29472953
pipelinePromise(iterator, writable)
29482954
.then((value) => {
29492955
console.log(value, 'value returned');
Collapse file

‎lib/stream.js‎

Copy file name to clipboardExpand all lines: lib/stream.js
+36Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,21 @@
2121

2222
'use strict';
2323

24+
const {
25+
ObjectDefineProperty,
26+
} = primordials;
27+
28+
const {
29+
promisify: { custom: customPromisify },
30+
} = require('internal/util');
31+
2432
const pipeline = require('internal/streams/pipeline');
2533
const eos = require('internal/streams/end-of-stream');
2634
const internalBuffer = require('internal/buffer');
2735

36+
// Lazy loaded
37+
let promises = null;
38+
2839
// Note: export Stream before Readable/Writable/Duplex/...
2940
// to avoid a cross-reference(require) issues
3041
const Stream = module.exports = require('internal/streams/legacy');
@@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough');
3849
Stream.pipeline = pipeline;
3950
Stream.finished = eos;
4051

52+
ObjectDefineProperty(Stream, 'promises', {
53+
configurable: true,
54+
enumerable: true,
55+
get() {
56+
if (promises === null) promises = require('stream/promises');
57+
return promises;
58+
}
59+
});
60+
61+
ObjectDefineProperty(pipeline, customPromisify, {
62+
enumerable: true,
63+
get() {
64+
if (promises === null) promises = require('stream/promises');
65+
return promises.pipeline;
66+
}
67+
});
68+
69+
ObjectDefineProperty(eos, customPromisify, {
70+
enumerable: true,
71+
get() {
72+
if (promises === null) promises = require('stream/promises');
73+
return promises.finished;
74+
}
75+
});
76+
4177
// Backwards-compat with node 0.4.x
4278
Stream.Stream = Stream;
4379

Collapse file

‎lib/stream/promises.js‎

Copy file name to clipboard
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict';
2+
3+
const {
4+
Promise,
5+
} = primordials;
6+
7+
let pl;
8+
let eos;
9+
10+
function pipeline(...streams) {
11+
if (!pl) pl = require('internal/streams/pipeline');
12+
return new Promise((resolve, reject) => {
13+
pl(...streams, (err, value) => {
14+
if (err) {
15+
reject(err);
16+
} else {
17+
resolve(value);
18+
}
19+
});
20+
});
21+
}
22+
23+
function finished(stream, opts) {
24+
if (!eos) eos = require('internal/streams/end-of-stream');
25+
return new Promise((resolve, reject) => {
26+
eos(stream, opts, (err) => {
27+
if (err) {
28+
reject(err);
29+
} else {
30+
resolve();
31+
}
32+
});
33+
});
34+
}
35+
36+
module.exports = {
37+
finished,
38+
pipeline,
39+
};
Collapse file

‎node.gyp‎

Copy file name to clipboardExpand all lines: node.gyp
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
'lib/readline.js',
7979
'lib/repl.js',
8080
'lib/stream.js',
81+
'lib/stream/promises.js',
8182
'lib/_stream_readable.js',
8283
'lib/_stream_writable.js',
8384
'lib/_stream_duplex.js',
Collapse file
+103Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const stream = require('stream');
5+
const {
6+
Readable,
7+
Writable,
8+
promises,
9+
} = stream;
10+
const {
11+
finished,
12+
pipeline,
13+
} = require('stream/promises');
14+
const fs = require('fs');
15+
const assert = require('assert');
16+
const { promisify } = require('util');
17+
18+
assert.strictEqual(promises.pipeline, pipeline);
19+
assert.strictEqual(promises.finished, finished);
20+
assert.strictEqual(pipeline, promisify(stream.pipeline));
21+
assert.strictEqual(finished, promisify(stream.finished));
22+
23+
// pipeline success
24+
{
25+
let finished = false;
26+
const processed = [];
27+
const expected = [
28+
Buffer.from('a'),
29+
Buffer.from('b'),
30+
Buffer.from('c')
31+
];
32+
33+
const read = new Readable({
34+
read() { }
35+
});
36+
37+
const write = new Writable({
38+
write(data, enc, cb) {
39+
processed.push(data);
40+
cb();
41+
}
42+
});
43+
44+
write.on('finish', () => {
45+
finished = true;
46+
});
47+
48+
for (let i = 0; i < expected.length; i++) {
49+
read.push(expected[i]);
50+
}
51+
read.push(null);
52+
53+
pipeline(read, write).then(common.mustCall((value) => {
54+
assert.ok(finished);
55+
assert.deepStrictEqual(processed, expected);
56+
}));
57+
}
58+
59+
// pipeline error
60+
{
61+
const read = new Readable({
62+
read() { }
63+
});
64+
65+
const write = new Writable({
66+
write(data, enc, cb) {
67+
cb();
68+
}
69+
});
70+
71+
read.push('data');
72+
setImmediate(() => read.destroy());
73+
74+
pipeline(read, write).catch(common.mustCall((err) => {
75+
assert.ok(err, 'should have an error');
76+
}));
77+
}
78+
79+
// finished success
80+
{
81+
async function run() {
82+
const rs = fs.createReadStream(__filename);
83+
84+
let ended = false;
85+
rs.resume();
86+
rs.on('end', () => {
87+
ended = true;
88+
});
89+
await finished(rs);
90+
assert(ended);
91+
}
92+
93+
run().then(common.mustCall());
94+
}
95+
96+
// finished error
97+
{
98+
const rs = fs.createReadStream('file-does-not-exist');
99+
100+
assert.rejects(finished(rs), {
101+
code: 'ENOENT'
102+
}).then(common.mustCall());
103+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.