Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b73ec46

Browse filesBrowse files
addaleaxMyles Borins
authored andcommitted
stream: reset awaitDrain after manual .resume()
Reset the `readableState.awaitDrain` counter after manual calls to `.resume()`. What might happen otherwise is that a slow consumer at the end of the pipe could end up stalling the piping in the following scenario: 1. The writable stream indicates that its buffer is full. 2. This leads the readable stream to `pause()` and increase its `awaitDrain` counter, which will be decreased by the writable’s next `drain` event. 3. Something calls `.resume()` manually. 4. The readable continues to pipe to the writable, but once again the writable stream indicates that the buffer is full. 5. The `awaitDrain` counter is thus increased again, but since it has now been increased twice for a single piping destination, the next `drain` event will not be able to reset `awaitDrain` to zero. 6. The pipe is stalled and no data is passed along anymore. The solution in this commit is to reset the `awaitDrain` counter to zero when `resume()` is called. Fixes: #7159 PR-URL: #7160 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 98469ad commit b73ec46
Copy full SHA for b73ec46

File tree

Expand file treeCollapse file tree

2 files changed

+55
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+55
-0
lines changed
Open diff view settings
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ function resume_(stream, state) {
727727
}
728728

729729
state.resumeScheduled = false;
730+
state.awaitDrain = 0;
730731
stream.emit('resume');
731732
flow(stream);
732733
if (state.flowing && !state.reading)
Collapse file
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
5+
// A consumer stream with a very low highWaterMark, which starts in a state
6+
// where it buffers the chunk it receives rather than indicating that they
7+
// have been consumed.
8+
const writable = new stream.Writable({
9+
highWaterMark: 5
10+
});
11+
12+
let isCurrentlyBufferingWrites = true;
13+
const queue = [];
14+
15+
writable._write = (chunk, encoding, cb) => {
16+
if (isCurrentlyBufferingWrites)
17+
queue.push({chunk, cb});
18+
else
19+
cb();
20+
};
21+
22+
const readable = new stream.Readable({
23+
read() {}
24+
});
25+
26+
readable.pipe(writable);
27+
28+
readable.once('pause', common.mustCall(() => {
29+
// First pause, resume manually. The next write() to writable will still
30+
// return false, because chunks are still being buffered, so it will increase
31+
// the awaitDrain counter again.
32+
process.nextTick(common.mustCall(() => {
33+
readable.resume();
34+
}));
35+
36+
readable.once('pause', common.mustCall(() => {
37+
// Second pause, handle all chunks from now on. Once all callbacks that
38+
// are currently queued up are handled, the awaitDrain drain counter should
39+
// fall back to 0 and all chunks that are pending on the readable side
40+
// should be flushed.
41+
isCurrentlyBufferingWrites = false;
42+
for (const queued of queue)
43+
queued.cb();
44+
}));
45+
}));
46+
47+
readable.push(Buffer(100)); // Fill the writable HWM, first 'pause'.
48+
readable.push(Buffer(100)); // Second 'pause'.
49+
readable.push(Buffer(100)); // Should get through to the writable.
50+
readable.push(null);
51+
52+
writable.on('finish', common.mustCall(() => {
53+
// Everything okay, all chunks were written.
54+
}));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.