Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 07c7f19

Browse filesBrowse files
calvinmetcalfjasnell
authored andcommitted
stream: add final method
Adds the ability to for write streams to have an _final method which acts similarly to the _flush method that transform streams have but is called before the finish event is emitted and if asynchronous delays the stream from finishing. The `final` option may also be passed in order to set it. PR-URL: #12828 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Refael Ackermann <refack@gmail.com>
1 parent 87cef63 commit 07c7f19
Copy full SHA for 07c7f19
Expand file treeCollapse file tree

8 files changed

+317
-32
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+25-3Lines changed: 25 additions & 3 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:
11981198
<p>[Writable](#stream_class_stream_writable)</p>
11991199
</td>
12001200
<td>
1201-
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
1201+
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
1202+
<code>[_final][stream-_final]</code></p>
12021203
</td>
12031204
</tr>
12041205
<tr>
@@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:
12091210
<p>[Duplex](#stream_class_stream_duplex)</p>
12101211
</td>
12111212
<td>
1212-
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
1213+
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
1214+
<code>[_final][stream-_final]</code></p>
12131215
</td>
12141216
</tr>
12151217
<tr>
@@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:
12201222
<p>[Transform](#stream_class_stream_transform)</p>
12211223
</td>
12221224
<td>
1223-
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p>
1225+
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>,
1226+
<code>[_final][stream-_final]</code></p>
12241227
</td>
12251228
</tr>
12261229
</table>
@@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The
12791282
[`stream._writev()`][stream-_writev] method.
12801283
* `destroy` {Function} Implementation for the
12811284
[`stream._destroy()`][writable-_destroy] method.
1285+
* `final` {Function} Implementation for the
1286+
[`stream._final()`][stream-_final] method.
12821287

12831288
For example:
12841289

@@ -1398,6 +1403,22 @@ added: REPLACEME
13981403
* `callback` {Function} A callback function that takes an optional error argument
13991404
which is invoked when the writable is destroyed.
14001405

1406+
#### writable.\_final(callback)
1407+
<!-- YAML
1408+
added: REPLACEME
1409+
-->
1410+
1411+
* `callback` {Function} Call this function (optionally with an error
1412+
argument) when you are done writing any remaining data.
1413+
1414+
Note: `_final()` **must not** be called directly. It MAY be implemented
1415+
by child classes, and if so, will be called by the internal Writable
1416+
class methods only.
1417+
1418+
This optional function will be called before the stream closes, delaying the
1419+
`finish` event until `callback` is called. This is useful to close resources
1420+
or write buffered data before a stream ends.
1421+
14011422
#### Errors While Writing
14021423

14031424
It is recommended that errors occurring during the processing of the
@@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume.
21152136
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
21162137
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
21172138
[stream-_writev]: #stream_writable_writev_chunks_callback
2139+
[stream-_final]: #stream_writable_final_callback
21182140
[stream-end]: #stream_writable_end_chunk_encoding_callback
21192141
[stream-pause]: #stream_readable_pause
21202142
[stream-push]: #stream_readable_push_chunk_encoding
Collapse file

‎lib/_stream_writable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_writable.js
+29-6Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ function WritableState(options, stream) {
5858
// cast to ints.
5959
this.highWaterMark = Math.floor(this.highWaterMark);
6060

61+
// if _final has been called
62+
this.finalCalled = false;
63+
64+
// if _final has been called
65+
this.finalCalled = false;
66+
6167
// drain event flag.
6268
this.needDrain = false;
6369
// at the start of calling end()
@@ -199,6 +205,9 @@ function Writable(options) {
199205

200206
if (typeof options.destroy === 'function')
201207
this._destroy = options.destroy;
208+
209+
if (typeof options.final === 'function')
210+
this._final = options.final;
202211
}
203212

204213
Stream.call(this);
@@ -520,23 +529,37 @@ function needFinish(state) {
520529
!state.finished &&
521530
!state.writing);
522531
}
523-
524-
function prefinish(stream, state) {
525-
if (!state.prefinished) {
532+
function callFinal(stream, state) {
533+
stream._final((err) => {
534+
state.pendingcb--;
535+
if (err) {
536+
stream.emit('error', err);
537+
}
526538
state.prefinished = true;
527539
stream.emit('prefinish');
540+
finishMaybe(stream, state);
541+
});
542+
}
543+
function prefinish(stream, state) {
544+
if (!state.prefinished && !state.finalCalled) {
545+
if (typeof stream._final === 'function') {
546+
state.pendingcb++;
547+
state.finalCalled = true;
548+
process.nextTick(callFinal, stream, state);
549+
} else {
550+
state.prefinished = true;
551+
stream.emit('prefinish');
552+
}
528553
}
529554
}
530555

531556
function finishMaybe(stream, state) {
532557
var need = needFinish(state);
533558
if (need) {
559+
prefinish(stream, state);
534560
if (state.pendingcb === 0) {
535-
prefinish(stream, state);
536561
state.finished = true;
537562
stream.emit('finish');
538-
} else {
539-
prefinish(stream, state);
540563
}
541564
}
542565
return need;
Collapse file
+3-11Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
'use strict';
2-
require('../common');
3-
const assert = require('assert');
2+
const common = require('../common');
43

54
const Readable = require('stream').Readable;
65

7-
let _readCalled = false;
8-
function _read(n) {
9-
_readCalled = true;
6+
const _read = common.mustCall(function _read(n) {
107
this.push(null);
11-
}
8+
});
129

1310
const r = new Readable({ read: _read });
1411
r.resume();
15-
16-
process.on('exit', function() {
17-
assert.strictEqual(r._read, _read);
18-
assert(_readCalled);
19-
});
Collapse file
+12-12Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
'use strict';
2-
require('../common');
2+
const common = require('../common');
33
const assert = require('assert');
44

55
const Transform = require('stream').Transform;
66

7-
let _transformCalled = false;
8-
function _transform(d, e, n) {
9-
_transformCalled = true;
7+
const _transform = common.mustCall(function _transform(d, e, n) {
108
n();
11-
}
9+
});
1210

13-
let _flushCalled = false;
14-
function _flush(n) {
15-
_flushCalled = true;
11+
const _final = common.mustCall(function _final(n) {
1612
n();
17-
}
13+
});
14+
15+
const _flush = common.mustCall(function _flush(n) {
16+
n();
17+
});
1818

1919
const t = new Transform({
2020
transform: _transform,
21-
flush: _flush
21+
flush: _flush,
22+
final: _final
2223
});
2324

2425
const t2 = new Transform({});
@@ -34,6 +35,5 @@ assert.throws(() => {
3435
process.on('exit', () => {
3536
assert.strictEqual(t._transform, _transform);
3637
assert.strictEqual(t._flush, _flush);
37-
assert.strictEqual(_transformCalled, true);
38-
assert.strictEqual(_flushCalled, true);
38+
assert.strictEqual(t._final, _final);
3939
});
Collapse file
+100Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const stream = require('stream');
6+
let state = 0;
7+
8+
/*
9+
What you do
10+
var stream = new tream.Transform({
11+
transform: function transformCallback(chunk, _, next) {
12+
// part 1
13+
this.push(chunk);
14+
//part 2
15+
next();
16+
},
17+
final: function endCallback(done) {
18+
// part 1
19+
process.nextTick(function () {
20+
// part 2
21+
done();
22+
});
23+
},
24+
flush: function flushCallback(done) {
25+
// part 1
26+
process.nextTick(function () {
27+
// part 2
28+
done();
29+
});
30+
}
31+
});
32+
t.on('data', dataListener);
33+
t.on('end', endListener);
34+
t.on('finish', finishListener);
35+
t.write(1);
36+
t.write(4);
37+
t.end(7, endMethodCallback);
38+
39+
The order things are called
40+
41+
1. transformCallback part 1
42+
2. dataListener
43+
3. transformCallback part 2
44+
4. transformCallback part 1
45+
5. dataListener
46+
6. transformCallback part 2
47+
7. transformCallback part 1
48+
8. dataListener
49+
9. transformCallback part 2
50+
10. finalCallback part 1
51+
11. finalCallback part 2
52+
12. flushCallback part 1
53+
13. finishListener
54+
14. endMethodCallback
55+
15. flushCallback part 2
56+
16. endListener
57+
*/
58+
59+
const t = new stream.Transform({
60+
objectMode: true,
61+
transform: common.mustCall(function(chunk, _, next) {
62+
assert.strictEqual(++state, chunk, 'transformCallback part 1');
63+
this.push(state);
64+
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
65+
process.nextTick(next);
66+
}, 3),
67+
final: common.mustCall(function(done) {
68+
state++;
69+
assert.strictEqual(state, 10, 'finalCallback part 1');
70+
state++;
71+
assert.strictEqual(state, 11, 'finalCallback part 2');
72+
done();
73+
}, 1),
74+
flush: common.mustCall(function(done) {
75+
state++;
76+
assert.strictEqual(state, 12, 'flushCallback part 1');
77+
process.nextTick(function() {
78+
state++;
79+
assert.strictEqual(state, 15, 'flushCallback part 2');
80+
done();
81+
});
82+
}, 1)
83+
});
84+
t.on('finish', common.mustCall(function() {
85+
state++;
86+
assert.strictEqual(state, 13, 'finishListener');
87+
}, 1));
88+
t.on('end', common.mustCall(function() {
89+
state++;
90+
assert.strictEqual(state, 16, 'end event');
91+
}, 1));
92+
t.on('data', common.mustCall(function(d) {
93+
assert.strictEqual(++state, d + 1, 'dataListener');
94+
}, 3));
95+
t.write(1);
96+
t.write(4);
97+
t.end(7, common.mustCall(function() {
98+
state++;
99+
assert.strictEqual(state, 14, 'endMethodCallback');
100+
}, 1));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.