Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ab03635

Browse filesBrowse files
committed
http: fix stalled pipeline bug
This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: #3332 PR-URL: #3342 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Trevor Norris <trev.norris@gmail.com>
1 parent f45c315 commit ab03635
Copy full SHA for ab03635

File tree

Expand file treeCollapse file tree

5 files changed

+120
-47
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+120
-47
lines changed
Open diff view settings
Collapse file

‎lib/_http_common.js‎

Copy file name to clipboardExpand all lines: lib/_http_common.js
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {
140140

141141
parser._headers = [];
142142
parser._url = '';
143+
parser._consumed = false;
143144

144145
// Only called in the slow case where slow means
145146
// that the request headers were either fragmented
@@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
167168
if (parser) {
168169
parser._headers = [];
169170
parser.onIncoming = null;
171+
if (parser._consumed)
172+
parser.unconsume();
173+
parser._consumed = false;
170174
if (parser.socket)
171175
parser.socket.parser = null;
172176
parser.socket = null;
Collapse file

‎lib/_http_outgoing.js‎

Copy file name to clipboardExpand all lines: lib/_http_outgoing.js
+31-35Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
135135
this.outputEncodings.unshift('binary');
136136
this.outputCallbacks.unshift(null);
137137
this.outputSize += this._header.length;
138-
if (this._onPendingData !== null)
138+
if (typeof this._onPendingData === 'function')
139139
this._onPendingData(this._header.length);
140140
}
141141
this._headerSent = true;
@@ -158,22 +158,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
158158
// There might be pending data in the this.output buffer.
159159
var outputLength = this.output.length;
160160
if (outputLength > 0) {
161-
var output = this.output;
162-
var outputEncodings = this.outputEncodings;
163-
var outputCallbacks = this.outputCallbacks;
164-
connection.cork();
165-
for (var i = 0; i < outputLength; i++) {
166-
connection.write(output[i], outputEncodings[i],
167-
outputCallbacks[i]);
168-
}
169-
connection.uncork();
170-
171-
this.output = [];
172-
this.outputEncodings = [];
173-
this.outputCallbacks = [];
174-
if (this._onPendingData !== null)
175-
this._onPendingData(-this.outputSize);
176-
this.outputSize = 0;
161+
this._flushOutput(connection);
177162
} else if (data.length === 0) {
178163
if (typeof callback === 'function')
179164
process.nextTick(callback);
@@ -198,7 +183,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
198183
this.outputEncodings.push(encoding);
199184
this.outputCallbacks.push(callback);
200185
this.outputSize += data.length;
201-
if (this._onPendingData !== null)
186+
if (typeof this._onPendingData === 'function')
202187
this._onPendingData(data.length);
203188
return false;
204189
};
@@ -644,26 +629,11 @@ OutgoingMessage.prototype._finish = function() {
644629
// to attempt to flush any pending messages out to the socket.
645630
OutgoingMessage.prototype._flush = function() {
646631
var socket = this.socket;
647-
var outputLength, ret;
632+
var ret;
648633

649634
if (socket && socket.writable) {
650635
// There might be remaining data in this.output; write it out
651-
outputLength = this.output.length;
652-
if (outputLength > 0) {
653-
var output = this.output;
654-
var outputEncodings = this.outputEncodings;
655-
var outputCallbacks = this.outputCallbacks;
656-
socket.cork();
657-
for (var i = 0; i < outputLength; i++) {
658-
ret = socket.write(output[i], outputEncodings[i],
659-
outputCallbacks[i]);
660-
}
661-
socket.uncork();
662-
663-
this.output = [];
664-
this.outputEncodings = [];
665-
this.outputCallbacks = [];
666-
}
636+
ret = this._flushOutput(socket);
667637

668638
if (this.finished) {
669639
// This is a queue to the server or client to bring in the next this.
@@ -675,6 +645,32 @@ OutgoingMessage.prototype._flush = function() {
675645
}
676646
};
677647

648+
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
649+
var ret;
650+
var outputLength = this.output.length;
651+
if (outputLength <= 0)
652+
return ret;
653+
654+
var output = this.output;
655+
var outputEncodings = this.outputEncodings;
656+
var outputCallbacks = this.outputCallbacks;
657+
socket.cork();
658+
for (var i = 0; i < outputLength; i++) {
659+
ret = socket.write(output[i], outputEncodings[i],
660+
outputCallbacks[i]);
661+
}
662+
socket.uncork();
663+
664+
this.output = [];
665+
this.outputEncodings = [];
666+
this.outputCallbacks = [];
667+
if (typeof this._onPendingData === 'function')
668+
this._onPendingData(-this.outputSize);
669+
this.outputSize = 0;
670+
671+
return ret;
672+
};
673+
678674

679675
OutgoingMessage.prototype.flushHeaders = function() {
680676
if (!this._header) {
Collapse file

‎lib/_http_server.js‎

Copy file name to clipboardExpand all lines: lib/_http_server.js
+33-6Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,10 @@ function connectionListener(socket) {
343343
socket.on = socketOnWrap;
344344

345345
var external = socket._handle._externalStream;
346-
if (external)
346+
if (external) {
347+
parser._consumed = true;
347348
parser.consume(external);
349+
}
348350
external = null;
349351
parser[kOnExecute] = onParserExecute;
350352

@@ -382,7 +384,7 @@ function connectionListener(socket) {
382384
socket.removeListener('data', socketOnData);
383385
socket.removeListener('end', socketOnEnd);
384386
socket.removeListener('close', serverSocketCloseListener);
385-
parser.unconsume(socket._handle._externalStream);
387+
unconsume(parser, socket);
386388
parser.finish();
387389
freeParser(parser, req, null);
388390
parser = null;
@@ -530,13 +532,38 @@ function connectionListener(socket) {
530532
exports._connectionListener = connectionListener;
531533

532534
function onSocketResume() {
533-
if (this._handle)
535+
// It may seem that the socket is resumed, but this is an enemy's trick to
536+
// deceive us! `resume` is emitted asynchronously, and may be called from
537+
// `incoming.readStart()`. Stop the socket again here, just to preserve the
538+
// state.
539+
//
540+
// We don't care about stream semantics for the consumed socket anyway.
541+
if (this._paused) {
542+
this.pause();
543+
return;
544+
}
545+
546+
if (this._handle && !this._handle.reading) {
547+
this._handle.reading = true;
534548
this._handle.readStart();
549+
}
535550
}
536551

537552
function onSocketPause() {
538-
if (this._handle)
553+
if (this._handle && this._handle.reading) {
554+
this._handle.reading = false;
539555
this._handle.readStop();
556+
}
557+
}
558+
559+
function unconsume(parser, socket) {
560+
if (socket._handle) {
561+
if (parser._consumed)
562+
parser.unconsume(socket._handle._externalStream);
563+
parser._consumed = false;
564+
socket.removeListener('pause', onSocketPause);
565+
socket.removeListener('resume', onSocketResume);
566+
}
540567
}
541568

542569
function socketOnWrap(ev, fn) {
@@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
546573
return res;
547574
}
548575

549-
if (this._handle && (ev === 'data' || ev === 'readable'))
550-
this.parser.unconsume(this._handle._externalStream);
576+
if (ev === 'data' || ev === 'readable')
577+
unconsume(this.parser, this);
551578

552579
return res;
553580
}
Collapse file

‎src/node_http_parser.cc‎

Copy file name to clipboardExpand all lines: src/node_http_parser.cc
+11-6Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -484,13 +484,18 @@ class Parser : public BaseObject {
484484
if (parser->prev_alloc_cb_.is_empty())
485485
return;
486486

487-
CHECK(args[0]->IsExternal());
488-
Local<External> stream_obj = args[0].As<External>();
489-
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
490-
CHECK_NE(stream, nullptr);
487+
// Restore stream's callbacks
488+
if (args.Length() == 1 && args[0]->IsExternal()) {
489+
Local<External> stream_obj = args[0].As<External>();
490+
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
491+
CHECK_NE(stream, nullptr);
492+
493+
stream->set_alloc_cb(parser->prev_alloc_cb_);
494+
stream->set_read_cb(parser->prev_read_cb_);
495+
}
491496

492-
stream->set_alloc_cb(parser->prev_alloc_cb_);
493-
stream->set_read_cb(parser->prev_read_cb_);
497+
parser->prev_alloc_cb_.clear();
498+
parser->prev_read_cb_.clear();
494499
}
495500

496501

Collapse file
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
const net = require('net');
6+
7+
const big = new Buffer(16 * 1024);
8+
big.fill('A');
9+
10+
const COUNT = 1e4;
11+
12+
var received = 0;
13+
14+
var client;
15+
const server = http.createServer(function(req, res) {
16+
res.end(big, function() {
17+
if (++received === COUNT) {
18+
server.close();
19+
client.end();
20+
}
21+
});
22+
}).listen(common.PORT, function() {
23+
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
24+
client = net.connect(common.PORT, function() {
25+
client.write(req);
26+
});
27+
28+
// Just let the test terminate instead of hanging
29+
client.on('close', function() {
30+
if (received !== COUNT)
31+
server.close();
32+
});
33+
client.resume();
34+
});
35+
36+
process.on('exit', function() {
37+
// The server should pause connection on pipeline flood, but it shoul still
38+
// resume it and finish processing the requests, when its output queue will
39+
// be empty again.
40+
assert.equal(received, COUNT);
41+
});

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.