Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 95a61cb

Browse filesBrowse files
apapirovskigibfahn
authored andcommitted
http2: fix stream reading resumption
_read should always resume the underlying code that is attempting to push data to a readable stream. Adjust http2 core code to resume its reading appropriately. Some other general cleanup around reading, resuming & draining. PR-URL: #16580 Fixes: #16578 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 528edb2 commit 95a61cb
Copy full SHA for 95a61cb

File tree

Expand file treeCollapse file tree

7 files changed

+79
-19
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+79
-19
lines changed
Open diff view settings
Collapse file

‎lib/internal/http2/core.js‎

Copy file name to clipboardExpand all lines: lib/internal/http2/core.js
+12-14Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) {
282282
'report this as a bug in Node.js');
283283
_unrefActive(owner); // Reset the session timeout timer
284284
_unrefActive(stream); // Reset the stream timeout timer
285-
if (nread >= 0 && !stream.destroyed)
286-
return stream.push(buf);
285+
if (nread >= 0 && !stream.destroyed) {
286+
// prevent overflowing the buffer while pause figures out the
287+
// stream needs to actually pause and streamOnPause runs
288+
if (!stream.push(buf))
289+
owner[kHandle].streamReadStop(id);
290+
return;
291+
}
287292

288293
// Last chunk was received. End the readable side.
289294
stream.push(null);
@@ -1276,8 +1281,6 @@ function onStreamClosed(code) {
12761281
}
12771282

12781283
function streamOnResume() {
1279-
if (this._paused)
1280-
return this.pause();
12811284
if (this[kID] === undefined) {
12821285
this.once('ready', streamOnResume);
12831286
return;
@@ -1299,12 +1302,10 @@ function streamOnPause() {
12991302
}
13001303
}
13011304

1302-
function streamOnDrain() {
1303-
const needPause = 0 > this._writableState.highWaterMark;
1304-
if (this._paused && !needPause) {
1305-
this._paused = false;
1306-
this.resume();
1307-
}
1305+
function handleFlushData(handle, streamID) {
1306+
assert(handle.flushData(streamID) === undefined,
1307+
`HTTP/2 Stream ${streamID} does not exist. Please report this as ` +
1308+
'a bug in Node.js');
13081309
}
13091310

13101311
function streamOnSessionConnect() {
@@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex {
13571358
this.once('finish', onHandleFinish);
13581359
this.on('resume', streamOnResume);
13591360
this.on('pause', streamOnPause);
1360-
this.on('drain', streamOnDrain);
13611361
session.once('close', state.closeHandler);
13621362

13631363
if (session[kState].connecting) {
@@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex {
15071507
return;
15081508
}
15091509
_unrefActive(this);
1510-
assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
1511-
'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
1512-
'a bug in Node.js');
1510+
process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]);
15131511
}
15141512

15151513
// Submits an RST-STREAM frame to shutdown this stream.
Collapse file

‎src/node_http2.cc‎

Copy file name to clipboardExpand all lines: src/node_http2.cc
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
756756
if (!(stream = session->FindStream(id))) {
757757
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
758758
}
759-
stream->FlushDataChunks();
759+
stream->ReadResume();
760760
}
761761

762762
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
Collapse file

‎src/node_http2_core-inl.h‎

Copy file name to clipboardExpand all lines: src/node_http2_core-inl.h
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() {
510510
// the proceed with the rest.
511511
while (srcRemaining > destRemaining) {
512512
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
513-
TypeName(), destRemaining);
513+
TypeName(), destLength + destRemaining);
514514
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
515515
destLength += destRemaining;
516516
Send(&dest, destLength);
@@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
896896
FlushDataChunks();
897897
}
898898

899+
inline void Nghttp2Stream::ReadResume() {
900+
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
901+
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
902+
903+
// Flush any queued data chunks immediately out to the JS layer
904+
FlushDataChunks();
905+
}
906+
899907
inline void Nghttp2Stream::ReadStop() {
900908
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
901909
if (!IsReading())
Collapse file

‎src/node_http2_core.h‎

Copy file name to clipboardExpand all lines: src/node_http2_core.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ class Nghttp2Stream {
384384
// the session to be emitted at the JS side
385385
inline void ReadStart();
386386

387+
// Resume Reading
388+
inline void ReadResume();
389+
387390
// Stop/Pause Reading.
388391
inline void ReadStop();
389392

Collapse file

‎test/parallel/parallel.status‎

Copy file name to clipboardExpand all lines: test/parallel/parallel.status
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY
1818
[$system==solaris] # Also applies to SmartOS
1919

2020
[$system==freebsd]
21+
test-http2-compat-serverrequest-pipe: PASS,FLAKY
22+
test-http2-pipe: PASS,FLAKY
2123

2224
[$system==aix]
Collapse file

‎test/parallel/test-http2-compat-serverrequest-pipe.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-compat-serverrequest-pipe.js
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ const path = require('path');
1111

1212
// piping should work as expected with createWriteStream
1313

14-
const loc = fixtures.path('person.jpg');
15-
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
1614
common.refreshTmpDir();
15+
const loc = fixtures.path('url-tests.js');
16+
const fn = path.join(common.tmpDir, 'http2-url-tests.js');
1717

1818
const server = http2.createServer();
1919

2020
server.on('request', common.mustCall((req, res) => {
2121
const dest = req.pipe(fs.createWriteStream(fn));
2222
dest.on('finish', common.mustCall(() => {
2323
assert.strictEqual(req.complete, true);
24-
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
24+
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
2525
fs.unlinkSync(fn);
2626
res.end();
2727
}));
Collapse file

‎test/parallel/test-http2-pipe.js‎

Copy file name to clipboard
+49Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const fixtures = require('../common/fixtures');
7+
const assert = require('assert');
8+
const http2 = require('http2');
9+
const fs = require('fs');
10+
const path = require('path');
11+
12+
// piping should work as expected with createWriteStream
13+
14+
common.refreshTmpDir();
15+
const loc = fixtures.path('url-tests.js');
16+
const fn = path.join(common.tmpDir, 'http2-url-tests.js');
17+
18+
const server = http2.createServer();
19+
20+
server.on('stream', common.mustCall((stream) => {
21+
const dest = stream.pipe(fs.createWriteStream(fn));
22+
dest.on('finish', common.mustCall(() => {
23+
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
24+
fs.unlinkSync(fn);
25+
stream.respond();
26+
stream.end();
27+
}));
28+
}));
29+
30+
server.listen(0, common.mustCall(() => {
31+
const port = server.address().port;
32+
const client = http2.connect(`http://localhost:${port}`);
33+
34+
let remaining = 2;
35+
function maybeClose() {
36+
if (--remaining === 0) {
37+
server.close();
38+
client.destroy();
39+
}
40+
}
41+
42+
const req = client.request({ ':method': 'POST' });
43+
req.on('response', common.mustCall());
44+
req.resume();
45+
req.on('end', common.mustCall(maybeClose));
46+
const str = fs.createReadStream(loc);
47+
str.on('end', common.mustCall(maybeClose));
48+
str.pipe(req);
49+
}));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.