Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fa60eb8

Browse filesBrowse files
mcollinacodebytere
authored andcommitted
stream: correctly pause and resume after once('readable')
Fixes: #24281 PR-URL: #24366 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
1 parent f0602f8 commit fa60eb8
Copy full SHA for fa60eb8

File tree

Expand file treeCollapse file tree

2 files changed

+41
-3
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+41
-3
lines changed
Open diff view settings
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+12-3Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ function ReadableState(options, stream, isDuplex) {
113113
this.emittedReadable = false;
114114
this.readableListening = false;
115115
this.resumeScheduled = false;
116+
this.paused = true;
116117

117118
// Should close be emitted on destroy. Defaults to true.
118119
this.emitClose = options.emitClose !== false;
@@ -858,10 +859,16 @@ Readable.prototype.removeAllListeners = function(ev) {
858859
};
859860

860861
function updateReadableListening(self) {
861-
self._readableState.readableListening = self.listenerCount('readable') > 0;
862+
const state = self._readableState;
863+
state.readableListening = self.listenerCount('readable') > 0;
862864

863-
// crude way to check if we should resume
864-
if (self.listenerCount('data') > 0) {
865+
if (state.resumeScheduled && !state.paused) {
866+
// flowing needs to be set to true now, otherwise
867+
// the upcoming resume will not flow.
868+
state.flowing = true;
869+
870+
// crude way to check if we should resume
871+
} else if (self.listenerCount('data') > 0) {
865872
self.resume();
866873
}
867874
}
@@ -883,6 +890,7 @@ Readable.prototype.resume = function() {
883890
state.flowing = !state.readableListening;
884891
resume(this, state);
885892
}
893+
state.paused = false;
886894
return this;
887895
};
888896

@@ -913,6 +921,7 @@ Readable.prototype.pause = function() {
913921
this._readableState.flowing = false;
914922
this.emit('pause');
915923
}
924+
this._readableState.paused = true;
916925
return this;
917926
};
918927

Collapse file
+29Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable } = require('stream');
5+
6+
// This test verifies that a stream could be resumed after
7+
// removing the readable event in the same tick
8+
9+
check(new Readable({
10+
objectMode: true,
11+
highWaterMark: 1,
12+
read() {
13+
if (!this.first) {
14+
this.push('hello');
15+
this.first = true;
16+
return;
17+
}
18+
19+
this.push(null);
20+
}
21+
}));
22+
23+
function check(s) {
24+
const readableListener = common.mustNotCall();
25+
s.on('readable', readableListener);
26+
s.on('end', common.mustCall());
27+
s.removeListener('readable', readableListener);
28+
s.resume();
29+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.