Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9e0d18f

Browse filesBrowse files
clshortfuseMylesBorins
authored andcommitted
http2: use and support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: #31309 Fixes: #33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback PR-URL: #33875 Backport-PR-URL: #34838 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent a16f0f4 commit 9e0d18f
Copy full SHA for 9e0d18f

File tree

Expand file treeCollapse file tree

6 files changed

+191
-47
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+191
-47
lines changed
Open diff view settings
Collapse file

‎lib/internal/http2/core.js‎

Copy file name to clipboardExpand all lines: lib/internal/http2/core.js
+82-22Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter {
11361136
streams: new Map(),
11371137
pendingStreams: new Set(),
11381138
pendingAck: 0,
1139+
shutdownWritableCalled: false,
11391140
writeQueueSize: 0,
11401141
originSet: undefined
11411142
};
@@ -1702,6 +1703,26 @@ function afterShutdown(status) {
17021703
stream[kMaybeDestroy]();
17031704
}
17041705

1706+
function shutdownWritable(callback) {
1707+
const handle = this[kHandle];
1708+
if (!handle) return callback();
1709+
const state = this[kState];
1710+
if (state.shutdownWritableCalled) {
1711+
// Backport v14.x: Session required for debugging stream object
1712+
// debugStreamObj(this, 'shutdownWritable() already called');
1713+
return callback();
1714+
}
1715+
state.shutdownWritableCalled = true;
1716+
1717+
const req = new ShutdownWrap();
1718+
req.oncomplete = afterShutdown;
1719+
req.callback = callback;
1720+
req.handle = handle;
1721+
const err = handle.shutdown(req);
1722+
if (err === 1) // synchronous finish
1723+
return afterShutdown.call(req, 0);
1724+
}
1725+
17051726
function finishSendTrailers(stream, headersList) {
17061727
// The stream might be destroyed and in that case
17071728
// there is nothing to do.
@@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex {
19621983

19631984
let req;
19641985

1986+
let waitingForWriteCallback = true;
1987+
let waitingForEndCheck = true;
1988+
let writeCallbackErr;
1989+
let endCheckCallbackErr;
1990+
const done = () => {
1991+
if (waitingForEndCheck || waitingForWriteCallback) return;
1992+
const err = writeCallbackErr || endCheckCallbackErr;
1993+
// writeGeneric does not destroy on error and
1994+
// we cannot enable autoDestroy,
1995+
// so make sure to destroy on error.
1996+
if (err) {
1997+
this.destroy(err);
1998+
}
1999+
cb(err);
2000+
};
2001+
const writeCallback = (err) => {
2002+
waitingForWriteCallback = false;
2003+
writeCallbackErr = err;
2004+
done();
2005+
};
2006+
const endCheckCallback = (err) => {
2007+
waitingForEndCheck = false;
2008+
endCheckCallbackErr = err;
2009+
done();
2010+
};
2011+
// Shutdown write stream right after last chunk is sent
2012+
// so final DATA frame can include END_STREAM flag
2013+
process.nextTick(() => {
2014+
if (writeCallbackErr ||
2015+
!this._writableState.ending ||
2016+
this._writableState.buffered.length ||
2017+
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
2018+
return endCheckCallback();
2019+
// Backport v14.x: Session required for debugging stream object
2020+
// debugStreamObj(this, 'shutting down writable on last write');
2021+
shutdownWritable.call(this, endCheckCallback);
2022+
});
2023+
19652024
if (writev)
1966-
req = writevGeneric(this, data, cb);
2025+
req = writevGeneric(this, data, writeCallback);
19672026
else
1968-
req = writeGeneric(this, data, encoding, cb);
2027+
req = writeGeneric(this, data, encoding, writeCallback);
19692028

19702029
trackWriteState(this, req.bytes);
19712030
}
@@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex {
19792038
}
19802039

19812040
_final(cb) {
1982-
const handle = this[kHandle];
19832041
if (this.pending) {
19842042
this.once('ready', () => this._final(cb));
1985-
} else if (handle !== undefined) {
1986-
debugStreamObj(this, '_final shutting down');
1987-
const req = new ShutdownWrap();
1988-
req.oncomplete = afterShutdown;
1989-
req.callback = cb;
1990-
req.handle = handle;
1991-
const err = handle.shutdown(req);
1992-
if (err === 1) // synchronous finish
1993-
return afterShutdown.call(req, 0);
1994-
} else {
1995-
cb();
2043+
return;
19962044
}
2045+
// Backport v14.x: Session required for debugging stream object
2046+
// debugStreamObj(this, 'shutting down writable on _final');
2047+
shutdownWritable.call(this, cb);
19972048
}
19982049

19992050
_read(nread) {
@@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex {
20982149
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');
20992150

21002151
const state = this[kState];
2101-
const sessionCode = session[kState].goawayCode ||
2102-
session[kState].destroyCode;
2103-
const code = err != null ?
2104-
sessionCode || NGHTTP2_INTERNAL_ERROR :
2105-
state.rstCode || sessionCode;
2152+
const sessionState = session[kState];
2153+
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
2154+
2155+
// If a stream has already closed successfully, there is no error
2156+
// to report from this stream, even if the session has errored.
2157+
// This can happen if the stream was already in process of destroying
2158+
// after a successful close, but the session had a error between
2159+
// this stream's close and destroy operations.
2160+
// Previously, this always overrode a successful close operation code
2161+
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2162+
const code = (err != null ?
2163+
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
2164+
(this.closed ? this.rstCode : sessionCode)
2165+
);
21062166
const hasHandle = handle !== undefined;
21072167

21082168
if (!this.closed)
@@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex {
21112171

21122172
if (hasHandle) {
21132173
handle.destroy();
2114-
session[kState].streams.delete(id);
2174+
sessionState.streams.delete(id);
21152175
} else {
2116-
session[kState].pendingStreams.delete(this);
2176+
sessionState.pendingStreams.delete(this);
21172177
}
21182178

21192179
// Adjust the write queue size for accounting
2120-
session[kState].writeQueueSize -= state.writeQueueSize;
2180+
sessionState.writeQueueSize -= state.writeQueueSize;
21212181
state.writeQueueSize = 0;
21222182

21232183
// RST code 8 not emitted as an error as its used by clients to signify
Collapse file

‎src/node_http2.cc‎

Copy file name to clipboardExpand all lines: src/node_http2.cc
+7-6Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
732732
// quite expensive. This is a potential performance optimization target later.
733733
ssize_t Http2Session::ConsumeHTTP2Data() {
734734
CHECK_NOT_NULL(stream_buf_.base);
735-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
735+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
736736
size_t read_len = stream_buf_.len - stream_buf_offset_;
737737

738738
// multiple side effects.
@@ -753,11 +753,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
753753
CHECK_GT(ret, 0);
754754
CHECK_LE(static_cast<size_t>(ret), read_len);
755755

756-
if (static_cast<size_t>(ret) < read_len) {
757-
// Mark the remainder of the data as available for later consumption.
758-
stream_buf_offset_ += ret;
759-
return ret;
760-
}
756+
// Mark the remainder of the data as available for later consumption.
757+
// Even if all bytes were received, a paused stream may delay the
758+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
759+
stream_buf_offset_ += ret;
760+
return ret;
761761
}
762762

763763
// We are done processing the current input chunk.
@@ -1093,6 +1093,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10931093
if (session->is_write_in_progress()) {
10941094
CHECK(session->is_reading_stopped());
10951095
session->set_receive_paused();
1096+
Debug(session, "receive paused");
10961097
return NGHTTP2_ERR_PAUSE;
10971098
}
10981099

Collapse file

‎test/parallel/test-http2-misbehaving-multiplex.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-misbehaving-multiplex.js
+39-17Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
const common = require('../common');
5+
const assert = require('assert');
56

67
if (!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
let client;
1415

1516
const server = h2.createServer();
17+
let gotFirstStreamId1;
1618
server.on('stream', common.mustCall((stream) => {
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// The error will be emitted asynchronously
21-
stream.on('error', common.expectsError({
22-
constructor: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error', () => {});
27+
stream.on('close', (err) => {
28+
if (stream.id === 1) {
29+
if (gotFirstStreamId1) {
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1 = true;
40+
}
41+
assert.strictEqual(err, undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id, 5);
46+
2647
}, 2));
2748

2849
server.on('session', common.mustCall((session) => {
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {
3556

3657
const settings = new h2test.SettingsFrame();
3758
const settingsAck = new h2test.SettingsFrame(true);
38-
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
39-
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
40-
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
41-
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
61+
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
62+
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
4263

4364
server.listen(0, () => {
4465
client = net.connect(server.address().port, () => {
4566
client.write(h2test.kClientMagic, () => {
4667
client.write(settings.data, () => {
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data, () => {
50-
// This will make it ok.
51-
client.write(head2.data, () => {
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data, () => {
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data, () => {
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data, () => {
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data, () => {
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Collapse file
+61Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
9+
const { PerformanceObserver } = require('perf_hooks');
10+
11+
const server = http2.createServer();
12+
13+
server.on('stream', (stream, headers) => {
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch (headers[':path']) {
19+
case '/singleEnd':
20+
stream.end('OK');
21+
break;
22+
case '/sequentialEnd':
23+
stream.write('OK');
24+
stream.end();
25+
break;
26+
case '/delayedEnd':
27+
stream.write('OK', () => stream.end());
28+
break;
29+
}
30+
});
31+
32+
function testRequest(path, targetFrameCount, callback) {
33+
const obs = new PerformanceObserver((list, observer) => {
34+
const entry = list.getEntries()[0];
35+
if (entry.name !== 'Http2Session') return;
36+
if (entry.type !== 'client') return;
37+
assert.strictEqual(entry.framesReceived, targetFrameCount);
38+
observer.disconnect();
39+
callback();
40+
});
41+
obs.observe({ entryTypes: ['http2'] });
42+
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
43+
const req = client.request({ ':path': path });
44+
req.resume();
45+
req.end();
46+
req.on('end', () => client.close());
47+
});
48+
}
49+
50+
// SETTINGS => SETTINGS => HEADERS => DATA
51+
const MIN_FRAME_COUNT = 4;
52+
53+
server.listen(0, () => {
54+
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
55+
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
56+
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
57+
server.close();
58+
});
59+
});
60+
});
61+
});
Collapse file

‎test/parallel/test-http2-padding-aligned.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-padding-aligned.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
2626
// The lengths of the expected writes... note that this is highly
2727
// sensitive to how the internals are implemented.
2828
const serverLengths = [24, 9, 9, 32];
29-
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
29+
const clientLengths = [9, 9, 48, 9, 1, 21, 1];
3030

3131
// Adjust for the 24-byte preamble and two 9-byte settings frames, and
3232
// the result must be equally divisible by 8
Collapse file

‎test/parallel/test-http2-perf_hooks.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-perf_hooks.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
3030
break;
3131
case 'client':
3232
assert.strictEqual(entry.streamCount, 1);
33-
assert.strictEqual(entry.framesReceived, 8);
33+
assert.strictEqual(entry.framesReceived, 7);
3434
break;
3535
default:
3636
assert.fail('invalid Http2Session type');

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.