Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e2a2a3f

Browse filesBrowse files
ronagtargos
authored andcommitted
stream: use lazy registration for drain for fast destinations
PR-URL: #29095 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 178caa5 commit e2a2a3f
Copy full SHA for e2a2a3f

File tree

Expand file treeCollapse file tree

3 files changed

+36
-15
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+36
-15
lines changed
Open diff view settings
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+13-8Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -681,20 +681,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
681681
dest.end();
682682
}
683683

684-
// When the dest drains, it reduces the awaitDrain counter
685-
// on the source. This would be more elegant with a .once()
686-
// handler in flow(), but adding and removing repeatedly is
687-
// too slow.
688-
const ondrain = pipeOnDrain(src);
689-
dest.on('drain', ondrain);
684+
let ondrain;
690685

691686
var cleanedUp = false;
692687
function cleanup() {
693688
debug('cleanup');
694689
// Cleanup event handlers once the pipe is broken
695690
dest.removeListener('close', onclose);
696691
dest.removeListener('finish', onfinish);
697-
dest.removeListener('drain', ondrain);
692+
if (ondrain) {
693+
dest.removeListener('drain', ondrain);
694+
}
698695
dest.removeListener('error', onerror);
699696
dest.removeListener('unpipe', onunpipe);
700697
src.removeListener('end', onend);
@@ -708,7 +705,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
708705
// flowing again.
709706
// So, if this is awaiting a drain, then we just call it now.
710707
// If we don't know, then assume that we are waiting for one.
711-
if (state.awaitDrain &&
708+
if (ondrain && state.awaitDrain &&
712709
(!dest._writableState || dest._writableState.needDrain))
713710
ondrain();
714711
}
@@ -729,6 +726,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
729726
debug('false write response, pause', state.awaitDrain);
730727
state.awaitDrain++;
731728
}
729+
if (!ondrain) {
730+
// When the dest drains, it reduces the awaitDrain counter
731+
// on the source. This would be more elegant with a .once()
732+
// handler in flow(), but adding and removing repeatedly is
733+
// too slow.
734+
ondrain = pipeOnDrain(src);
735+
dest.on('drain', ondrain);
736+
}
732737
src.pause();
733738
}
734739
}
Collapse file

‎test/parallel/test-stream-pipe-flow.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-pipe-flow.js
+23Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22
const common = require('../common');
3+
const assert = require('assert');
34
const { Readable, Writable, PassThrough } = require('stream');
45

56
{
@@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream');
6566
wrapper.resume();
6667
wrapper.on('end', common.mustCall());
6768
}
69+
70+
{
71+
// Only register drain if there is backpressure.
72+
const rs = new Readable({ read() {} });
73+
74+
const pt = rs
75+
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
76+
assert.strictEqual(pt.listenerCount('drain'), 0);
77+
pt.on('finish', () => {
78+
assert.strictEqual(pt.listenerCount('drain'), 0);
79+
});
80+
81+
rs.push('asd');
82+
assert.strictEqual(pt.listenerCount('drain'), 0);
83+
84+
process.nextTick(() => {
85+
rs.push('asd');
86+
assert.strictEqual(pt.listenerCount('drain'), 0);
87+
rs.push(null);
88+
assert.strictEqual(pt.listenerCount('drain'), 0);
89+
});
90+
}
Collapse file

‎test/parallel/test-stream2-readable-legacy-drain.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream2-readable-legacy-drain.js
-7Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,4 @@ function drain() {
5252

5353
w.end = common.mustCall();
5454

55-
// Just for kicks, let's mess with the drain count.
56-
// This verifies that even if it gets negative in the
57-
// pipe() cleanup function, we'll still function properly.
58-
r.on('readable', function() {
59-
w.emit('drain');
60-
});
61-
6255
r.pipe(w);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.