Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fb8cc72

Browse filesBrowse files
committed
stream: construct
Provide a standardized way of asynchronously creating and initializing resources before performing any work. Refs: #29314 PR-URL: #29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
1 parent 9949a2e commit fb8cc72
Copy full SHA for fb8cc72

File tree

Expand file treeCollapse file tree

5 files changed

+544
-20
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+544
-20
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+164-7Lines changed: 164 additions & 7 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,7 @@ added: v9.3.0
550550

551551
* {number}
552552

553-
Return the value of `highWaterMark` passed when constructing this
554-
`Writable`.
553+
Return the value of `highWaterMark` passed when creating this `Writable`.
555554

556555
##### `writable.writableLength`
557556
<!-- YAML
@@ -1193,8 +1192,7 @@ added: v9.3.0
11931192

11941193
* {number}
11951194

1196-
Returns the value of `highWaterMark` passed when constructing this
1197-
`Readable`.
1195+
Returns the value of `highWaterMark` passed when creating this `Readable`.
11981196

11991197
##### `readable.readableLength`
12001198
<!-- YAML
@@ -1792,7 +1790,7 @@ expectations.
17921790
added: v1.2.0
17931791
-->
17941792

1795-
For many simple cases, it is possible to construct a stream without relying on
1793+
For many simple cases, it is possible to create a stream without relying on
17961794
inheritance. This can be accomplished by directly creating instances of the
17971795
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
17981796
objects and passing appropriate methods as constructor options.
@@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
18011799
const { Writable } = require('stream');
18021800

18031801
const myWritable = new Writable({
1802+
construct(callback) {
1803+
// Initialize state and load resources...
1804+
},
18041805
write(chunk, encoding, callback) {
18051806
// ...
1807+
},
1808+
destroy() {
1809+
// Free resources...
18061810
}
18071811
});
18081812
```
@@ -1861,6 +1865,8 @@ changes:
18611865
[`stream._destroy()`][writable-_destroy] method.
18621866
* `final` {Function} Implementation for the
18631867
[`stream._final()`][stream-_final] method.
1868+
* `construct` {Function} Implementation for the
1869+
[`stream._construct()`][writable-_construct] method.
18641870
* `autoDestroy` {boolean} Whether this stream should automatically call
18651871
`.destroy()` on itself after ending. **Default:** `true`.
18661872

@@ -1906,6 +1912,56 @@ const myWritable = new Writable({
19061912
});
19071913
```
19081914

1915+
#### `writable._construct(callback)`
1916+
<!-- YAML
1917+
added: REPLACEME
1918+
-->
1919+
1920+
* `callback` {Function} Call this function (optionally with an error
1921+
argument) when the stream has finished initializing.
1922+
1923+
The `_construct()` method MUST NOT be called directly. It may be implemented
1924+
by child classes, and if so, will be called by the internal `Writable`
1925+
class methods only.
1926+
1927+
This optional function will be called in a tick after the stream constructor
1928+
has returned, delaying any `_write`, `_final` and `_destroy` calls until
1929+
`callback` is called. This is useful to initialize state or asynchronously
1930+
initialize resources before the stream can be used.
1931+
1932+
```js
1933+
const { Writable } = require('stream');
1934+
const fs = require('fs');
1935+
1936+
class WriteStream extends Writable {
1937+
constructor(filename) {
1938+
super();
1939+
this.filename = filename;
1940+
this.fd = fd;
1941+
}
1942+
_construct(callback) {
1943+
fs.open(this.filename, (fd, err) => {
1944+
if (err) {
1945+
callback(err);
1946+
} else {
1947+
this.fd = fd;
1948+
callback();
1949+
}
1950+
});
1951+
}
1952+
_write(chunk, encoding, callback) {
1953+
fs.write(this.fd, chunk, callback);
1954+
}
1955+
_destroy(err, callback) {
1956+
if (this.fd) {
1957+
fs.close(this.fd, (er) => callback(er || err));
1958+
} else {
1959+
callback(err);
1960+
}
1961+
}
1962+
}
1963+
```
1964+
19091965
#### `writable._write(chunk, encoding, callback)`
19101966
<!-- YAML
19111967
changes:
@@ -2130,6 +2186,8 @@ changes:
21302186
method.
21312187
* `destroy` {Function} Implementation for the
21322188
[`stream._destroy()`][readable-_destroy] method.
2189+
* `construct` {Function} Implementation for the
2190+
[`stream._construct()`][readable-_construct] method.
21332191
* `autoDestroy` {boolean} Whether this stream should automatically call
21342192
`.destroy()` on itself after ending. **Default:** `true`.
21352193

@@ -2172,6 +2230,63 @@ const myReadable = new Readable({
21722230
});
21732231
```
21742232

2233+
#### `readable._construct(callback)`
2234+
<!-- YAML
2235+
added: REPLACEME
2236+
-->
2237+
2238+
* `callback` {Function} Call this function (optionally with an error
2239+
argument) when the stream has finished initializing.
2240+
2241+
The `_construct()` method MUST NOT be called directly. It may be implemented
2242+
by child classes, and if so, will be called by the internal `Readable`
2243+
class methods only.
2244+
2245+
This optional function will be called by the stream constructor,
2246+
delaying any `_read` and `_destroy` calls until `callback` is called. This is
2247+
useful to initialize state or asynchronously initialize resources before the
2248+
stream can be used.
2249+
2250+
```js
2251+
const { Readable } = require('stream');
2252+
const fs = require('fs');
2253+
2254+
class ReadStream extends Readable {
2255+
constructor(filename) {
2256+
super();
2257+
this.filename = filename;
2258+
this.fd = null;
2259+
}
2260+
_construct(callback) {
2261+
fs.open(this.filename, (fd, err) => {
2262+
if (err) {
2263+
callback(err);
2264+
} else {
2265+
this.fd = fd;
2266+
callback();
2267+
}
2268+
});
2269+
}
2270+
_read(n) {
2271+
const buf = Buffer.alloc(n);
2272+
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
2273+
if (err) {
2274+
this.destroy(err);
2275+
} else {
2276+
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
2277+
}
2278+
});
2279+
}
2280+
_destroy(err, callback) {
2281+
if (this.fd) {
2282+
fs.close(this.fd, (er) => callback(er || err));
2283+
} else {
2284+
callback(err);
2285+
}
2286+
}
2287+
}
2288+
```
2289+
21752290
#### `readable._read(size)`
21762291
<!-- YAML
21772292
added: v0.9.4
@@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
24272542
});
24282543
```
24292544

2545+
When using pipeline:
2546+
2547+
```js
2548+
const { Transform, pipeline } = require('stream');
2549+
const fs = require('fs');
2550+
2551+
pipeline(
2552+
fs.createReadStream('object.json')
2553+
.setEncoding('utf-8'),
2554+
new Transform({
2555+
decodeStrings: false, // Accept string input rather than Buffers
2556+
construct(callback) {
2557+
this.data = '';
2558+
callback();
2559+
},
2560+
transform(chunk, encoding, callback) {
2561+
this.data += chunk;
2562+
callback();
2563+
},
2564+
flush(callback) {
2565+
try {
2566+
// Make sure is valid json.
2567+
JSON.parse(this.data);
2568+
this.push(this.data);
2569+
} catch (err) {
2570+
callback(err);
2571+
}
2572+
}
2573+
}),
2574+
fs.createWriteStream('valid-object.json'),
2575+
(err) => {
2576+
if (err) {
2577+
console.error('failed', err);
2578+
} else {
2579+
console.log('completed');
2580+
}
2581+
}
2582+
);
2583+
```
2584+
24302585
#### An Example Duplex Stream
24312586

24322587
The following illustrates a simple example of a `Duplex` stream that wraps a
@@ -2706,8 +2861,8 @@ unhandled post-destroy errors.
27062861

27072862
#### Creating Readable Streams with Async Generators
27082863

2709-
We can construct a Node.js Readable Stream from an asynchronous generator
2710-
using the `Readable.from()` utility method:
2864+
A Node.js Readable Stream can be created from an asynchronous generator using
2865+
the `Readable.from()` utility method:
27112866

27122867
```js
27132868
const { Readable } = require('stream');
@@ -2960,6 +3115,7 @@ contain multi-byte characters.
29603115
[http-incoming-message]: http.html#http_class_http_incomingmessage
29613116
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
29623117
[object-mode]: #stream_object_mode
3118+
[readable-_construct]: #stream_readable_construct_callback
29633119
[readable-_destroy]: #stream_readable_destroy_err_callback
29643120
[readable-destroy]: #stream_readable_destroy_error
29653121
[stream-_final]: #stream_writable_final_callback
@@ -2976,6 +3132,7 @@ contain multi-byte characters.
29763132
[stream-uncork]: #stream_writable_uncork
29773133
[stream-write]: #stream_writable_write_chunk_encoding_callback
29783134
[Stream Three States]: #stream_three_states
3135+
[writable-_construct]: #stream_writable_construct_callback
29793136
[writable-_destroy]: #stream_writable_destroy_err_callback
29803137
[writable-destroy]: #stream_writable_destroy_error
29813138
[writable-new]: #stream_constructor_new_stream_writable_options
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+19-5Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
118118
this.endEmitted = false;
119119
this.reading = false;
120120

121+
// Stream is still being constructed and cannot be
122+
// destroyed until construction finished or failed.
123+
// Async construction is opt in, therefore we start as
124+
// constructed.
125+
this.constructed = true;
126+
121127
// A flag to be able to tell if the event 'readable'/'data' is emitted
122128
// immediately, or on a later tick. We set this to true at first, because
123129
// any actions that shouldn't happen until "later" should generally also
@@ -197,9 +203,16 @@ function Readable(options) {
197203

198204
if (typeof options.destroy === 'function')
199205
this._destroy = options.destroy;
206+
207+
if (typeof options.construct === 'function')
208+
this._construct = options.construct;
200209
}
201210

202211
Stream.call(this, options);
212+
213+
destroyImpl.construct(this, () => {
214+
maybeReadMore(this, this._readableState);
215+
});
203216
}
204217

205218
Readable.prototype.destroy = destroyImpl.destroy;
@@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
461474
}
462475

463476
// However, if we've ended, then there's no point, if we're already
464-
// reading, then it's unnecessary, and if we're destroyed or errored,
465-
// then it's not allowed.
466-
if (state.ended || state.reading || state.destroyed || state.errored) {
477+
// reading, then it's unnecessary, if we're constructing we have to wait,
478+
// and if we're destroyed or errored, then it's not allowed,
479+
if (state.ended || state.reading || state.destroyed || state.errored ||
480+
!state.constructed) {
467481
doRead = false;
468-
debug('reading or ended', doRead);
482+
debug('reading, ended or constructing', doRead);
469483
} else if (doRead) {
470484
debug('do read');
471485
state.reading = true;
@@ -587,7 +601,7 @@ function emitReadable_(stream) {
587601
// However, if we're not ended, or reading, and the length < hwm,
588602
// then go ahead and try to read some more preemptively.
589603
function maybeReadMore(stream, state) {
590-
if (!state.readingMore) {
604+
if (!state.readingMore && state.constructed) {
591605
state.readingMore = true;
592606
process.nextTick(maybeReadMore_, stream, state);
593607
}
Collapse file

‎lib/_stream_writable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_writable.js
+25-2Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
155155
// this must be 0 before 'finish' can be emitted.
156156
this.pendingcb = 0;
157157

158+
// Stream is still being constructed and cannot be
159+
// destroyed until construction finished or failed.
160+
// Async construction is opt in, therefore we start as
161+
// constructed.
162+
this.constructed = true;
163+
158164
// Emit prefinish if the only thing we're waiting for is _write cbs
159165
// This is relevant for synchronous Transform streams.
160166
this.prefinished = false;
@@ -249,9 +255,22 @@ function Writable(options) {
249255

250256
if (typeof options.final === 'function')
251257
this._final = options.final;
258+
259+
if (typeof options.construct === 'function')
260+
this._construct = options.construct;
252261
}
253262

254263
Stream.call(this, options);
264+
265+
destroyImpl.construct(this, () => {
266+
const state = this._writableState;
267+
268+
if (!state.writing) {
269+
clearBuffer(this, state);
270+
}
271+
272+
finishMaybe(this, state);
273+
});
255274
}
256275

257276
// Otherwise people can pipe Writable streams, which is just wrong.
@@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
342361

343362
state.length += len;
344363

345-
if (state.writing || state.corked || state.errored) {
364+
if (state.writing || state.corked || state.errored || !state.constructed) {
346365
state.buffered.push({ chunk, encoding, callback });
347366
if (state.allBuffers && encoding !== 'buffer') {
348367
state.allBuffers = false;
@@ -492,7 +511,10 @@ function errorBuffer(state, err) {
492511

493512
// If there's something in the buffer waiting, then process it.
494513
function clearBuffer(stream, state) {
495-
if (state.corked || state.bufferProcessing || state.destroyed) {
514+
if (state.corked ||
515+
state.bufferProcessing ||
516+
state.destroyed ||
517+
!state.constructed) {
496518
return;
497519
}
498520

@@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
600622

601623
function needFinish(state) {
602624
return (state.ending &&
625+
state.constructed &&
603626
state.length === 0 &&
604627
!state.errored &&
605628
state.buffered.length === 0 &&

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.