Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 54b36e4

Browse filesBrowse files
committed
fs: reimplement read and write streams using stream.construct
Refs: #23133 PR-URL: #29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
1 parent fb8cc72 commit 54b36e4
Copy full SHA for 54b36e4

File tree

Expand file treeCollapse file tree

6 files changed

+321
-144
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+321
-144
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+4-4Lines changed: 4 additions & 4 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -2242,10 +2242,10 @@ The `_construct()` method MUST NOT be called directly. It may be implemented
22422242
by child classes, and if so, will be called by the internal `Readable`
22432243
class methods only.
22442244

2245-
This optional function will be called by the stream constructor,
2246-
delaying any `_read` and `_destroy` calls until `callback` is called. This is
2247-
useful to initialize state or asynchronously initialize resources before the
2248-
stream can be used.
2245+
This optional function will be scheduled in the next tick by the stream
2246+
constructor, delaying any `_read` and `_destroy` calls until `callback` is
2247+
called. This is useful to initialize state or asynchronously initialize
2248+
resources before the stream can be used.
22492249

22502250
```js
22512251
const { Readable } = require('stream');
Collapse file

‎lib/internal/fs/streams.js‎

Copy file name to clipboardExpand all lines: lib/internal/fs/streams.js
+97-133Lines changed: 97 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ const {
1010

1111
const {
1212
ERR_INVALID_ARG_TYPE,
13-
ERR_OUT_OF_RANGE,
14-
ERR_STREAM_DESTROYED
13+
ERR_OUT_OF_RANGE
1514
} = require('internal/errors').codes;
1615
const { deprecate } = require('internal/util');
1716
const { validateInteger } = require('internal/validators');
17+
const { errorOrDestroy } = require('internal/streams/destroy');
1818
const fs = require('fs');
1919
const { Buffer } = require('buffer');
2020
const {
@@ -49,6 +49,57 @@ function roundUpToMultipleOf8(n) {
4949
return (n + 7) & ~7; // Align to 8 byte boundary.
5050
}
5151

52+
function _construct(callback) {
53+
const stream = this;
54+
if (typeof stream.fd === 'number') {
55+
callback();
56+
return;
57+
}
58+
59+
if (stream.open !== openWriteFs && stream.open !== openReadFs) {
60+
// Backwards compat for monkey patching open().
61+
const orgEmit = stream.emit;
62+
stream.emit = function(...args) {
63+
if (args[0] === 'open') {
64+
this.emit = orgEmit;
65+
callback();
66+
orgEmit.apply(this, args);
67+
} else if (args[0] === 'error') {
68+
this.emit = orgEmit;
69+
callback(args[1]);
70+
} else {
71+
orgEmit.apply(this, args);
72+
}
73+
};
74+
stream.open();
75+
} else {
76+
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
77+
if (er) {
78+
callback(er);
79+
} else {
80+
stream.fd = fd;
81+
callback();
82+
stream.emit('open', stream.fd);
83+
stream.emit('ready');
84+
}
85+
});
86+
}
87+
}
88+
89+
function close(stream, err, cb) {
90+
if (!stream.fd) {
91+
// TODO(ronag)
92+
// stream.closed = true;
93+
cb(err);
94+
} else {
95+
stream[kFs].close(stream.fd, (er) => {
96+
stream.closed = true;
97+
cb(er || err);
98+
});
99+
stream.fd = null;
100+
}
101+
}
102+
52103
function ReadStream(path, options) {
53104
if (!(this instanceof ReadStream))
54105
return new ReadStream(path, options);
@@ -79,7 +130,8 @@ function ReadStream(path, options) {
79130
this[kFs].close);
80131
}
81132

82-
Readable.call(this, options);
133+
options.autoDestroy = options.autoClose === undefined ?
134+
true : options.autoClose;
83135

84136
// Path will be ignored when fd is specified, so it can be falsy
85137
this.path = toPathIfFileURL(path);
@@ -89,7 +141,6 @@ function ReadStream(path, options) {
89141

90142
this.start = options.start;
91143
this.end = options.end;
92-
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
93144
this.pos = undefined;
94145
this.bytesRead = 0;
95146
this.closed = false;
@@ -115,56 +166,28 @@ function ReadStream(path, options) {
115166
}
116167
}
117168

118-
if (typeof this.fd !== 'number')
119-
_openReadFs(this);
120-
121-
this.on('end', function() {
122-
if (this.autoClose) {
123-
this.destroy();
124-
}
125-
});
169+
Readable.call(this, options);
126170
}
127171
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
128172
ObjectSetPrototypeOf(ReadStream, Readable);
129173

174+
ObjectDefineProperty(ReadStream.prototype, 'autoClose', {
175+
get() {
176+
return this._readableState.autoDestroy;
177+
},
178+
set(val) {
179+
this._readableState.autoDestroy = val;
180+
}
181+
});
182+
130183
const openReadFs = deprecate(function() {
131-
_openReadFs(this);
184+
// Noop.
132185
}, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
133186
ReadStream.prototype.open = openReadFs;
134187

135-
function _openReadFs(stream) {
136-
// Backwards compat for overriden open.
137-
if (stream.open !== openReadFs) {
138-
stream.open();
139-
return;
140-
}
141-
142-
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
143-
if (er) {
144-
if (stream.autoClose) {
145-
stream.destroy();
146-
}
147-
stream.emit('error', er);
148-
return;
149-
}
150-
151-
stream.fd = fd;
152-
stream.emit('open', fd);
153-
stream.emit('ready');
154-
// Start the flow of data.
155-
stream.read();
156-
});
157-
}
188+
ReadStream.prototype._construct = _construct;
158189

159190
ReadStream.prototype._read = function(n) {
160-
if (typeof this.fd !== 'number') {
161-
return this.once('open', function() {
162-
this._read(n);
163-
});
164-
}
165-
166-
if (this.destroyed) return;
167-
168191
if (!pool || pool.length - pool.used < kMinPoolSpace) {
169192
// Discard the old pool.
170193
allocNewPool(this.readableHighWaterMark);
@@ -189,17 +212,14 @@ ReadStream.prototype._read = function(n) {
189212

190213
// the actual read.
191214
this[kIsPerformingIO] = true;
192-
this[kFs].read(
193-
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
215+
this[kFs]
216+
.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
194217
this[kIsPerformingIO] = false;
195218
// Tell ._destroy() that it's safe to close the fd now.
196219
if (this.destroyed) return this.emit(kIoDone, er);
197220

198221
if (er) {
199-
if (this.autoClose) {
200-
this.destroy();
201-
}
202-
this.emit('error', er);
222+
errorOrDestroy(this, er);
203223
} else {
204224
let b = null;
205225
// Now that we know how much data we have actually read, re-wind the
@@ -235,28 +255,13 @@ ReadStream.prototype._read = function(n) {
235255
};
236256

237257
ReadStream.prototype._destroy = function(err, cb) {
238-
if (typeof this.fd !== 'number') {
239-
this.once('open', closeFsStream.bind(null, this, cb, err));
240-
return;
241-
}
242-
243258
if (this[kIsPerformingIO]) {
244-
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
245-
return;
259+
this.once(kIoDone, (er) => close(this, err || er, cb));
260+
} else {
261+
close(this, err, cb);
246262
}
247-
248-
closeFsStream(this, cb, err);
249263
};
250264

251-
function closeFsStream(stream, cb, err) {
252-
stream[kFs].close(stream.fd, (er) => {
253-
stream.closed = true;
254-
cb(er || err);
255-
});
256-
257-
stream.fd = null;
258-
}
259-
260265
ReadStream.prototype.close = function(cb) {
261266
if (typeof cb === 'function') finished(this, cb);
262267
this.destroy();
@@ -276,11 +281,6 @@ function WriteStream(path, options) {
276281
// Only buffers are supported.
277282
options.decodeStrings = true;
278283

279-
if (options.autoDestroy === undefined) {
280-
options.autoDestroy = options.autoClose === undefined ?
281-
true : (options.autoClose || false);
282-
}
283-
284284
this[kFs] = options.fs || fs;
285285
if (typeof this[kFs].open !== 'function') {
286286
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
@@ -315,7 +315,8 @@ function WriteStream(path, options) {
315315
this._writev = null;
316316
}
317317

318-
Writable.call(this, options);
318+
options.autoDestroy = options.autoClose === undefined ?
319+
true : options.autoClose;
319320

320321
// Path will be ignored when fd is specified, so it can be falsy
321322
this.path = toPathIfFileURL(path);
@@ -324,7 +325,6 @@ function WriteStream(path, options) {
324325
this.mode = options.mode === undefined ? 0o666 : options.mode;
325326

326327
this.start = options.start;
327-
this.autoClose = options.autoDestroy;
328328
this.pos = undefined;
329329
this.bytesWritten = 0;
330330
this.closed = false;
@@ -336,74 +336,44 @@ function WriteStream(path, options) {
336336
this.pos = this.start;
337337
}
338338

339+
Writable.call(this, options);
340+
339341
if (options.encoding)
340342
this.setDefaultEncoding(options.encoding);
341-
342-
if (typeof this.fd !== 'number')
343-
_openWriteFs(this);
344343
}
345344
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
346345
ObjectSetPrototypeOf(WriteStream, Writable);
347346

348-
WriteStream.prototype._final = function(callback) {
349-
if (typeof this.fd !== 'number') {
350-
return this.once('open', function() {
351-
this._final(callback);
352-
});
347+
ObjectDefineProperty(WriteStream.prototype, 'autoClose', {
348+
get() {
349+
return this._writableState.autoDestroy;
350+
},
351+
set(val) {
352+
this._writableState.autoDestroy = val;
353353
}
354-
355-
callback();
356-
};
354+
});
357355

358356
const openWriteFs = deprecate(function() {
359-
_openWriteFs(this);
357+
// Noop.
360358
}, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
361359
WriteStream.prototype.open = openWriteFs;
362360

363-
function _openWriteFs(stream) {
364-
// Backwards compat for overriden open.
365-
if (stream.open !== openWriteFs) {
366-
stream.open();
367-
return;
368-
}
369-
370-
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
371-
if (er) {
372-
if (stream.autoClose) {
373-
stream.destroy();
374-
}
375-
stream.emit('error', er);
376-
return;
377-
}
378-
379-
stream.fd = fd;
380-
stream.emit('open', fd);
381-
stream.emit('ready');
382-
});
383-
}
384-
361+
WriteStream.prototype._construct = _construct;
385362

386363
WriteStream.prototype._write = function(data, encoding, cb) {
387-
if (typeof this.fd !== 'number') {
388-
return this.once('open', function() {
389-
this._write(data, encoding, cb);
390-
});
391-
}
392-
393-
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
394-
395364
this[kIsPerformingIO] = true;
396365
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
397366
this[kIsPerformingIO] = false;
398-
// Tell ._destroy() that it's safe to close the fd now.
399367
if (this.destroyed) {
368+
// Tell ._destroy() that it's safe to close the fd now.
400369
cb(er);
401370
return this.emit(kIoDone, er);
402371
}
403372

404373
if (er) {
405374
return cb(er);
406375
}
376+
407377
this.bytesWritten += bytes;
408378
cb();
409379
});
@@ -412,16 +382,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
412382
this.pos += data.length;
413383
};
414384

415-
416385
WriteStream.prototype._writev = function(data, cb) {
417-
if (typeof this.fd !== 'number') {
418-
return this.once('open', function() {
419-
this._writev(data, cb);
420-
});
421-
}
422-
423-
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
424-
425386
const len = data.length;
426387
const chunks = new Array(len);
427388
let size = 0;
@@ -436,18 +397,16 @@ WriteStream.prototype._writev = function(data, cb) {
436397
this[kIsPerformingIO] = true;
437398
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
438399
this[kIsPerformingIO] = false;
439-
// Tell ._destroy() that it's safe to close the fd now.
440400
if (this.destroyed) {
401+
// Tell ._destroy() that it's safe to close the fd now.
441402
cb(er);
442403
return this.emit(kIoDone, er);
443404
}
444405

445406
if (er) {
446-
if (this.autoClose) {
447-
this.destroy(er);
448-
}
449407
return cb(er);
450408
}
409+
451410
this.bytesWritten += bytes;
452411
cb();
453412
});
@@ -456,8 +415,13 @@ WriteStream.prototype._writev = function(data, cb) {
456415
this.pos += size;
457416
};
458417

459-
460-
WriteStream.prototype._destroy = ReadStream.prototype._destroy;
418+
WriteStream.prototype._destroy = function(err, cb) {
419+
if (this[kIsPerformingIO]) {
420+
this.once(kIoDone, (er) => close(this, err || er, cb));
421+
} else {
422+
close(this, err, cb);
423+
}
424+
};
461425
WriteStream.prototype.close = function(cb) {
462426
if (cb) {
463427
if (this.closed) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.