Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b732c92

Browse filesBrowse files
clshortfuseMylesBorins
authored andcommitted
http2: use and support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: #31309 Fixes: #33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback Backport-PR-URL: #34845 PR-URL: #33875 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent bfce0eb commit b732c92
Copy full SHA for b732c92

File tree

Expand file treeCollapse file tree

6 files changed

+193
-47
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+193
-47
lines changed
Open diff view settings
Collapse file

‎lib/internal/http2/core.js‎

Copy file name to clipboardExpand all lines: lib/internal/http2/core.js
+84-22Lines changed: 84 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,7 @@ class Http2Session extends EventEmitter {
11581158
streams: new Map(),
11591159
pendingStreams: new Set(),
11601160
pendingAck: 0,
1161+
shutdownWritableCalled: false,
11611162
writeQueueSize: 0,
11621163
originSet: undefined
11631164
};
@@ -1724,6 +1725,26 @@ function afterShutdown(status) {
17241725
stream[kMaybeDestroy]();
17251726
}
17261727

1728+
function shutdownWritable(callback) {
1729+
const handle = this[kHandle];
1730+
if (!handle) return callback();
1731+
const state = this[kState];
1732+
if (state.shutdownWritableCalled) {
1733+
// Backport v12.x: Session required for debugging stream object
1734+
// debugStreamObj(this, 'shutdownWritable() already called');
1735+
return callback();
1736+
}
1737+
state.shutdownWritableCalled = true;
1738+
1739+
const req = new ShutdownWrap();
1740+
req.oncomplete = afterShutdown;
1741+
req.callback = callback;
1742+
req.handle = handle;
1743+
const err = handle.shutdown(req);
1744+
if (err === 1) // synchronous finish
1745+
return afterShutdown.call(req, 0);
1746+
}
1747+
17271748
function finishSendTrailers(stream, headersList) {
17281749
// The stream might be destroyed and in that case
17291750
// there is nothing to do.
@@ -1983,10 +2004,50 @@ class Http2Stream extends Duplex {
19832004

19842005
let req;
19852006

2007+
let waitingForWriteCallback = true;
2008+
let waitingForEndCheck = true;
2009+
let writeCallbackErr;
2010+
let endCheckCallbackErr;
2011+
const done = () => {
2012+
if (waitingForEndCheck || waitingForWriteCallback) return;
2013+
const err = writeCallbackErr || endCheckCallbackErr;
2014+
// writeGeneric does not destroy on error and
2015+
// we cannot enable autoDestroy,
2016+
// so make sure to destroy on error.
2017+
if (err) {
2018+
this.destroy(err);
2019+
}
2020+
cb(err);
2021+
};
2022+
const writeCallback = (err) => {
2023+
waitingForWriteCallback = false;
2024+
writeCallbackErr = err;
2025+
done();
2026+
};
2027+
const endCheckCallback = (err) => {
2028+
waitingForEndCheck = false;
2029+
endCheckCallbackErr = err;
2030+
done();
2031+
};
2032+
// Shutdown write stream right after last chunk is sent
2033+
// so final DATA frame can include END_STREAM flag
2034+
process.nextTick(() => {
2035+
if (writeCallbackErr ||
2036+
!this._writableState.ending ||
2037+
// Backport v12.x: _writableState.buffered does not exist
2038+
// this._writableState.buffered.length ||
2039+
this._writableState.bufferedRequest ||
2040+
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
2041+
return endCheckCallback();
2042+
// Backport v12.x: Session required for debugging stream object
2043+
// debugStreamObj(this, 'shutting down writable on last write');
2044+
shutdownWritable.call(this, endCheckCallback);
2045+
});
2046+
19862047
if (writev)
1987-
req = writevGeneric(this, data, cb);
2048+
req = writevGeneric(this, data, writeCallback);
19882049
else
1989-
req = writeGeneric(this, data, encoding, cb);
2050+
req = writeGeneric(this, data, encoding, writeCallback);
19902051

19912052
trackWriteState(this, req.bytes);
19922053
}
@@ -2000,21 +2061,13 @@ class Http2Stream extends Duplex {
20002061
}
20012062

20022063
_final(cb) {
2003-
const handle = this[kHandle];
20042064
if (this.pending) {
20052065
this.once('ready', () => this._final(cb));
2006-
} else if (handle !== undefined) {
2007-
debugStreamObj(this, '_final shutting down');
2008-
const req = new ShutdownWrap();
2009-
req.oncomplete = afterShutdown;
2010-
req.callback = cb;
2011-
req.handle = handle;
2012-
const err = handle.shutdown(req);
2013-
if (err === 1) // synchronous finish
2014-
return afterShutdown.call(req, 0);
2015-
} else {
2016-
cb();
2066+
return;
20172067
}
2068+
// Backport v12.x: Session required for debugging stream object
2069+
// debugStreamObj(this, 'shutting down writable on _final');
2070+
shutdownWritable.call(this, cb);
20182071
}
20192072

20202073
_read(nread) {
@@ -2119,11 +2172,20 @@ class Http2Stream extends Duplex {
21192172
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');
21202173

21212174
const state = this[kState];
2122-
const sessionCode = session[kState].goawayCode ||
2123-
session[kState].destroyCode;
2124-
const code = err != null ?
2125-
sessionCode || NGHTTP2_INTERNAL_ERROR :
2126-
state.rstCode || sessionCode;
2175+
const sessionState = session[kState];
2176+
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
2177+
2178+
// If a stream has already closed successfully, there is no error
2179+
// to report from this stream, even if the session has errored.
2180+
// This can happen if the stream was already in process of destroying
2181+
// after a successful close, but the session had a error between
2182+
// this stream's close and destroy operations.
2183+
// Previously, this always overrode a successful close operation code
2184+
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2185+
const code = (err != null ?
2186+
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
2187+
(this.closed ? this.rstCode : sessionCode)
2188+
);
21272189
const hasHandle = handle !== undefined;
21282190

21292191
if (!this.closed)
@@ -2132,13 +2194,13 @@ class Http2Stream extends Duplex {
21322194

21332195
if (hasHandle) {
21342196
handle.destroy();
2135-
session[kState].streams.delete(id);
2197+
sessionState.streams.delete(id);
21362198
} else {
2137-
session[kState].pendingStreams.delete(this);
2199+
sessionState.pendingStreams.delete(this);
21382200
}
21392201

21402202
// Adjust the write queue size for accounting
2141-
session[kState].writeQueueSize -= state.writeQueueSize;
2203+
sessionState.writeQueueSize -= state.writeQueueSize;
21422204
state.writeQueueSize = 0;
21432205

21442206
// RST code 8 not emitted as an error as its used by clients to signify
Collapse file

‎src/node_http2.cc‎

Copy file name to clipboardExpand all lines: src/node_http2.cc
+7-6Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
811811
// quite expensive. This is a potential performance optimization target later.
812812
ssize_t Http2Session::ConsumeHTTP2Data() {
813813
CHECK_NOT_NULL(stream_buf_.base);
814-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
814+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
815815
size_t read_len = stream_buf_.len - stream_buf_offset_;
816816

817817
// multiple side effects.
@@ -832,11 +832,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
832832
CHECK_GT(ret, 0);
833833
CHECK_LE(static_cast<size_t>(ret), read_len);
834834

835-
if (static_cast<size_t>(ret) < read_len) {
836-
// Mark the remainder of the data as available for later consumption.
837-
stream_buf_offset_ += ret;
838-
return ret;
839-
}
835+
// Mark the remainder of the data as available for later consumption.
836+
// Even if all bytes were received, a paused stream may delay the
837+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
838+
stream_buf_offset_ += ret;
839+
return ret;
840840
}
841841

842842
// We are done processing the current input chunk.
@@ -1174,6 +1174,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
11741174
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
11751175
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
11761176
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
1177+
Debug(session, "receive paused");
11771178
return NGHTTP2_ERR_PAUSE;
11781179
}
11791180

Collapse file

‎test/parallel/test-http2-misbehaving-multiplex.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-misbehaving-multiplex.js
+39-17Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
const common = require('../common');
5+
const assert = require('assert');
56

67
if (!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
let client;
1415

1516
const server = h2.createServer();
17+
let gotFirstStreamId1;
1618
server.on('stream', common.mustCall((stream) => {
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// The error will be emitted asynchronously
21-
stream.on('error', common.expectsError({
22-
constructor: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error', () => {});
27+
stream.on('close', (err) => {
28+
if (stream.id === 1) {
29+
if (gotFirstStreamId1) {
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1 = true;
40+
}
41+
assert.strictEqual(err, undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id, 5);
46+
2647
}, 2));
2748

2849
server.on('session', common.mustCall((session) => {
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {
3556

3657
const settings = new h2test.SettingsFrame();
3758
const settingsAck = new h2test.SettingsFrame(true);
38-
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
39-
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
40-
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
41-
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
61+
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
62+
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
4263

4364
server.listen(0, () => {
4465
client = net.connect(server.address().port, () => {
4566
client.write(h2test.kClientMagic, () => {
4667
client.write(settings.data, () => {
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data, () => {
50-
// This will make it ok.
51-
client.write(head2.data, () => {
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data, () => {
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data, () => {
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data, () => {
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data, () => {
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Collapse file
+61Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
9+
const { PerformanceObserver } = require('perf_hooks');
10+
11+
const server = http2.createServer();
12+
13+
server.on('stream', (stream, headers) => {
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch (headers[':path']) {
19+
case '/singleEnd':
20+
stream.end('OK');
21+
break;
22+
case '/sequentialEnd':
23+
stream.write('OK');
24+
stream.end();
25+
break;
26+
case '/delayedEnd':
27+
stream.write('OK', () => stream.end());
28+
break;
29+
}
30+
});
31+
32+
function testRequest(path, targetFrameCount, callback) {
33+
const obs = new PerformanceObserver((list, observer) => {
34+
const entry = list.getEntries()[0];
35+
if (entry.name !== 'Http2Session') return;
36+
if (entry.type !== 'client') return;
37+
assert.strictEqual(entry.framesReceived, targetFrameCount);
38+
observer.disconnect();
39+
callback();
40+
});
41+
obs.observe({ entryTypes: ['http2'] });
42+
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
43+
const req = client.request({ ':path': path });
44+
req.resume();
45+
req.end();
46+
req.on('end', () => client.close());
47+
});
48+
}
49+
50+
// SETTINGS => SETTINGS => HEADERS => DATA
51+
const MIN_FRAME_COUNT = 4;
52+
53+
server.listen(0, () => {
54+
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
55+
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
56+
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
57+
server.close();
58+
});
59+
});
60+
});
61+
});
Collapse file

‎test/parallel/test-http2-padding-aligned.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-padding-aligned.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
2626
// The lengths of the expected writes... note that this is highly
2727
// sensitive to how the internals are implemented.
2828
const serverLengths = [24, 9, 9, 32];
29-
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
29+
const clientLengths = [9, 9, 48, 9, 1, 21, 1];
3030

3131
// Adjust for the 24-byte preamble and two 9-byte settings frames, and
3232
// the result must be equally divisible by 8
Collapse file

‎test/parallel/test-http2-perf_hooks.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-http2-perf_hooks.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
3030
break;
3131
case 'client':
3232
assert.strictEqual(entry.streamCount, 1);
33-
assert.strictEqual(entry.framesReceived, 8);
33+
assert.strictEqual(entry.framesReceived, 7);
3434
break;
3535
default:
3636
assert.fail('invalid Http2Session type');

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.