Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e592c32

Browse filesBrowse files
apapirovskigibfahn
authored andcommitted
http2: fix several timeout related issues
* correctly reset write timers: currently reset timers on both session & stream when write starts and when it ends. * prevent large writes from timing out: when writing a large chunk of data in http2, once the data is handed off to C++, the JS session & stream lose all track of the write and will timeout if the write doesn't complete within the timeout window Fix this issue by tracking whether a write request is ongoing and also tracking how many chunks have been sent since the most recent write started. (Since each write call resets the timer.) PR-URL: #16525 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 1b08ae8 commit e592c32
Copy full SHA for e592c32

File tree

Expand file treeCollapse file tree

6 files changed

+274
-6
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+274
-6
lines changed
Open diff view settings
Collapse file

‎lib/internal/http2/core.js‎

Copy file name to clipboardExpand all lines: lib/internal/http2/core.js
+72-6Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter {
746746
shutdown: false,
747747
shuttingDown: false,
748748
pendingAck: 0,
749-
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10)
749+
maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10),
750+
writeQueueSize: 0
750751
};
751752

752753
this[kType] = type;
@@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter {
10801081
}
10811082

10821083
_onTimeout() {
1084+
// This checks whether a write is currently in progress and also whether
1085+
// that write is actually sending data across the write. The kHandle
1086+
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
1087+
// happens, meaning that if a write is ongoing it should never equal the
1088+
// newly fetched, updated value.
1089+
if (this[kState].writeQueueSize > 0) {
1090+
const handle = this[kHandle];
1091+
const chunksSentSinceLastWrite = handle !== undefined ?
1092+
handle.chunksSentSinceLastWrite : null;
1093+
if (chunksSentSinceLastWrite !== null &&
1094+
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
1095+
_unrefActive(this);
1096+
return;
1097+
}
1098+
}
1099+
10831100
process.nextTick(emit, this, 'timeout');
10841101
}
10851102
}
@@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding) {
11991216
}
12001217
}
12011218

1219+
function trackWriteState(stream, bytes) {
1220+
const session = stream[kSession];
1221+
stream[kState].writeQueueSize += bytes;
1222+
session[kState].writeQueueSize += bytes;
1223+
session[kHandle].chunksSentSinceLastWrite = 0;
1224+
}
1225+
12021226
function afterDoStreamWrite(status, handle, req) {
1203-
_unrefActive(handle[kOwner]);
1227+
const session = handle[kOwner];
1228+
_unrefActive(session);
1229+
1230+
const state = session[kState];
1231+
const { bytes } = req;
1232+
state.writeQueueSize -= bytes;
1233+
1234+
const stream = state.streams.get(req.stream);
1235+
if (stream !== undefined) {
1236+
_unrefActive(stream);
1237+
stream[kState].writeQueueSize -= bytes;
1238+
}
1239+
12041240
if (typeof req.callback === 'function')
12051241
req.callback();
12061242
this.handle = undefined;
@@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex {
13121348
headersSent: false,
13131349
headRequest: false,
13141350
aborted: false,
1315-
closeHandler: onSessionClose.bind(this)
1351+
closeHandler: onSessionClose.bind(this),
1352+
writeQueueSize: 0
13161353
};
13171354

13181355
this.once('ready', streamOnceReady);
@@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex {
13591396
}
13601397

13611398
_onTimeout() {
1399+
// This checks whether a write is currently in progress and also whether
1400+
// that write is actually sending data across the write. The kHandle
1401+
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
1402+
// happens, meaning that if a write is ongoing it should never equal the
1403+
// newly fetched, updated value.
1404+
if (this[kState].writeQueueSize > 0) {
1405+
const handle = this[kSession][kHandle];
1406+
const chunksSentSinceLastWrite = handle !== undefined ?
1407+
handle.chunksSentSinceLastWrite : null;
1408+
if (chunksSentSinceLastWrite !== null &&
1409+
chunksSentSinceLastWrite !== handle.updateChunksSent()) {
1410+
_unrefActive(this);
1411+
_unrefActive(this[kSession]);
1412+
return;
1413+
}
1414+
}
1415+
13621416
process.nextTick(emit, this, 'timeout');
13631417
}
13641418

@@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex {
13961450
this.once('ready', this._write.bind(this, data, encoding, cb));
13971451
return;
13981452
}
1399-
_unrefActive(this);
14001453
if (!this[kState].headersSent)
14011454
this[kProceed]();
14021455
const session = this[kSession];
1456+
_unrefActive(this);
1457+
_unrefActive(session);
14031458
const handle = session[kHandle];
14041459
const req = new WriteWrap();
14051460
req.stream = this[kID];
@@ -1410,18 +1465,19 @@ class Http2Stream extends Duplex {
14101465
const err = createWriteReq(req, handle, data, encoding);
14111466
if (err)
14121467
throw util._errnoException(err, 'write', req.error);
1413-
this._bytesDispatched += req.bytes;
1468+
trackWriteState(this, req.bytes);
14141469
}
14151470

14161471
_writev(data, cb) {
14171472
if (this[kID] === undefined) {
14181473
this.once('ready', this._writev.bind(this, data, cb));
14191474
return;
14201475
}
1421-
_unrefActive(this);
14221476
if (!this[kState].headersSent)
14231477
this[kProceed]();
14241478
const session = this[kSession];
1479+
_unrefActive(this);
1480+
_unrefActive(session);
14251481
const handle = session[kHandle];
14261482
const req = new WriteWrap();
14271483
req.stream = this[kID];
@@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex {
14381494
const err = handle.writev(req, chunks);
14391495
if (err)
14401496
throw util._errnoException(err, 'write', req.error);
1497+
trackWriteState(this, req.bytes);
14411498
}
14421499

14431500
_read(nread) {
@@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex {
15311588
return;
15321589
}
15331590

1591+
const state = this[kState];
1592+
session[kState].writeQueueSize -= state.writeQueueSize;
1593+
state.writeQueueSize = 0;
1594+
15341595
const server = session[kServer];
15351596
if (server !== undefined && err) {
15361597
server.emit('streamError', err, this);
@@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
16251686
if (ret < 0) {
16261687
err = new NghttpError(ret);
16271688
process.nextTick(emit, this, 'error', err);
1689+
break;
16281690
}
1691+
// exact length of the file doesn't matter here, since the
1692+
// stream is closing anyway — just use 1 to signify that
1693+
// a write does exist
1694+
trackWriteState(this, 1);
16291695
}
16301696
}
16311697

Collapse file

‎src/env.h‎

Copy file name to clipboardExpand all lines: src/env.h
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class ModuleWrap;
111111
V(callback_string, "callback") \
112112
V(change_string, "change") \
113113
V(channel_string, "channel") \
114+
V(chunks_sent_since_last_write_string, "chunksSentSinceLastWrite") \
114115
V(constants_string, "constants") \
115116
V(oncertcb_string, "oncertcb") \
116117
V(onclose_string, "_onclose") \
Collapse file

‎src/node_http2.cc‎

Copy file name to clipboardExpand all lines: src/node_http2.cc
+24Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
603603
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
604604
}
605605

606+
session->chunks_sent_since_last_write_ = 0;
607+
606608
Headers list(isolate, context, headers);
607609

608610
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
@@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
757759
stream->FlushDataChunks();
758760
}
759761

762+
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
763+
Http2Session* session;
764+
Environment* env = Environment::GetCurrent(args);
765+
Isolate* isolate = env->isolate();
766+
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
767+
768+
HandleScope scope(isolate);
769+
770+
uint32_t length = session->chunks_sent_since_last_write_;
771+
772+
session->object()->Set(env->context(),
773+
env->chunks_sent_since_last_write_string(),
774+
Integer::NewFromUnsigned(isolate, length)).FromJust();
775+
776+
args.GetReturnValue().Set(length);
777+
}
778+
760779
void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) {
761780
Http2Session* session;
762781
Environment* env = Environment::GetCurrent(args);
@@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap,
811830
}
812831
}
813832

833+
chunks_sent_since_last_write_ = 0;
834+
814835
nghttp2_stream_write_t* req = new nghttp2_stream_write_t;
815836
req->data = req_wrap;
816837

@@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length) {
846867
this,
847868
AfterWrite);
848869

870+
chunks_sent_since_last_write_++;
849871
uv_buf_t actual = uv_buf_init(buf->base, length);
850872
if (stream_->DoWrite(write_req, &actual, 1, nullptr)) {
851873
write_req->Dispose();
@@ -1255,6 +1277,8 @@ void Initialize(Local<Object> target,
12551277
Http2Session::DestroyStream);
12561278
env->SetProtoMethod(session, "flushData",
12571279
Http2Session::FlushData);
1280+
env->SetProtoMethod(session, "updateChunksSent",
1281+
Http2Session::UpdateChunksSent);
12581282
StreamBase::AddMethods<Http2Session>(env, session,
12591283
StreamBase::kFlagHasWritev |
12601284
StreamBase::kFlagNoShutdown);
Collapse file

‎src/node_http2.h‎

Copy file name to clipboardExpand all lines: src/node_http2.h
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class Http2Session : public AsyncWrap,
475475
static void SubmitGoaway(const FunctionCallbackInfo<Value>& args);
476476
static void DestroyStream(const FunctionCallbackInfo<Value>& args);
477477
static void FlushData(const FunctionCallbackInfo<Value>& args);
478+
static void UpdateChunksSent(const FunctionCallbackInfo<Value>& args);
478479

479480
template <get_setting fn>
480481
static void GetSettings(const FunctionCallbackInfo<Value>& args);
@@ -493,6 +494,9 @@ class Http2Session : public AsyncWrap,
493494
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
494495
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
495496

497+
// use this to allow timeout tracking during long-lasting writes
498+
uint32_t chunks_sent_since_last_write_ = 0;
499+
496500
char stream_buf_[kAllocBufferSize];
497501
};
498502

Collapse file
+89Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto)
4+
common.skip('missing crypto');
5+
const assert = require('assert');
6+
const fixtures = require('../common/fixtures');
7+
const fs = require('fs');
8+
const http2 = require('http2');
9+
const path = require('path');
10+
11+
common.refreshTmpDir();
12+
13+
// This test assesses whether long-running writes can complete
14+
// or timeout because the session or stream are not aware that the
15+
// backing stream is still writing.
16+
// To simulate a slow client, we write a really large chunk and
17+
// then proceed through the following cycle:
18+
// 1) Receive first 'data' event and record currently written size
19+
// 2) Once we've read up to currently written size recorded above,
20+
// we pause the stream and wait longer than the server timeout
21+
// 3) Socket.prototype._onTimeout triggers and should confirm
22+
// that the backing stream is still active and writing
23+
// 4) Our timer fires, we resume the socket and start at 1)
24+
25+
const writeSize = 3000000;
26+
const minReadSize = 500000;
27+
const serverTimeout = common.platformTimeout(500);
28+
let offsetTimeout = common.platformTimeout(100);
29+
let didReceiveData = false;
30+
31+
const content = Buffer.alloc(writeSize, 0x44);
32+
const filepath = path.join(common.tmpDir, 'http2-large-write.tmp');
33+
fs.writeFileSync(filepath, content, 'binary');
34+
const fd = fs.openSync(filepath, 'r');
35+
36+
const server = http2.createSecureServer({
37+
key: fixtures.readKey('agent1-key.pem'),
38+
cert: fixtures.readKey('agent1-cert.pem')
39+
});
40+
server.on('stream', common.mustCall((stream) => {
41+
stream.respondWithFD(fd, {
42+
'Content-Type': 'application/octet-stream',
43+
'Content-Length': content.length.toString(),
44+
'Vary': 'Accept-Encoding'
45+
});
46+
stream.setTimeout(serverTimeout);
47+
stream.on('timeout', () => {
48+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
49+
});
50+
stream.end();
51+
}));
52+
server.setTimeout(serverTimeout);
53+
server.on('timeout', () => {
54+
assert.strictEqual(didReceiveData, false, 'Should not timeout');
55+
});
56+
57+
server.listen(0, common.mustCall(() => {
58+
const client = http2.connect(`https://localhost:${server.address().port}`,
59+
{ rejectUnauthorized: false });
60+
61+
const req = client.request({ ':path': '/' });
62+
req.end();
63+
64+
const resume = () => req.resume();
65+
let receivedBufferLength = 0;
66+
let firstReceivedAt;
67+
req.on('data', common.mustCallAtLeast((buf) => {
68+
if (receivedBufferLength === 0) {
69+
didReceiveData = false;
70+
firstReceivedAt = Date.now();
71+
}
72+
receivedBufferLength += buf.length;
73+
if (receivedBufferLength >= minReadSize &&
74+
receivedBufferLength < writeSize) {
75+
didReceiveData = true;
76+
receivedBufferLength = 0;
77+
req.pause();
78+
setTimeout(
79+
resume,
80+
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
81+
);
82+
offsetTimeout = 0;
83+
}
84+
}, 1));
85+
req.on('end', common.mustCall(() => {
86+
client.destroy();
87+
server.close();
88+
}));
89+
}));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.