Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0427cdf

Browse filesBrowse files
indutnyjasnell
authored andcommitted
http: fix stalled pipeline bug
This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: #3332 PR-URL: #3342 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Trevor Norris <trev.norris@gmail.com>
1 parent 4bac5d9 commit 0427cdf
Copy full SHA for 0427cdf

File tree

Expand file treeCollapse file tree

5 files changed

+120
-47
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+120
-47
lines changed
Open diff view settings
Collapse file

‎lib/_http_common.js‎

Copy file name to clipboardExpand all lines: lib/_http_common.js
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {
140140

141141
parser._headers = [];
142142
parser._url = '';
143+
parser._consumed = false;
143144

144145
// Only called in the slow case where slow means
145146
// that the request headers were either fragmented
@@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
167168
if (parser) {
168169
parser._headers = [];
169170
parser.onIncoming = null;
171+
if (parser._consumed)
172+
parser.unconsume();
173+
parser._consumed = false;
170174
if (parser.socket)
171175
parser.socket.parser = null;
172176
parser.socket = null;
Collapse file

‎lib/_http_outgoing.js‎

Copy file name to clipboardExpand all lines: lib/_http_outgoing.js
+31-35Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
131131
this.outputEncodings.unshift('binary');
132132
this.outputCallbacks.unshift(null);
133133
this.outputSize += this._header.length;
134-
if (this._onPendingData !== null)
134+
if (typeof this._onPendingData === 'function')
135135
this._onPendingData(this._header.length);
136136
}
137137
this._headerSent = true;
@@ -154,22 +154,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
154154
// There might be pending data in the this.output buffer.
155155
var outputLength = this.output.length;
156156
if (outputLength > 0) {
157-
var output = this.output;
158-
var outputEncodings = this.outputEncodings;
159-
var outputCallbacks = this.outputCallbacks;
160-
connection.cork();
161-
for (var i = 0; i < outputLength; i++) {
162-
connection.write(output[i], outputEncodings[i],
163-
outputCallbacks[i]);
164-
}
165-
connection.uncork();
166-
167-
this.output = [];
168-
this.outputEncodings = [];
169-
this.outputCallbacks = [];
170-
if (this._onPendingData !== null)
171-
this._onPendingData(-this.outputSize);
172-
this.outputSize = 0;
157+
this._flushOutput(connection);
173158
} else if (data.length === 0) {
174159
if (typeof callback === 'function')
175160
process.nextTick(callback);
@@ -194,7 +179,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
194179
this.outputEncodings.push(encoding);
195180
this.outputCallbacks.push(callback);
196181
this.outputSize += data.length;
197-
if (this._onPendingData !== null)
182+
if (typeof this._onPendingData === 'function')
198183
this._onPendingData(data.length);
199184
return false;
200185
};
@@ -630,26 +615,11 @@ OutgoingMessage.prototype._finish = function() {
630615
// to attempt to flush any pending messages out to the socket.
631616
OutgoingMessage.prototype._flush = function() {
632617
var socket = this.socket;
633-
var outputLength, ret;
618+
var ret;
634619

635620
if (socket && socket.writable) {
636621
// There might be remaining data in this.output; write it out
637-
outputLength = this.output.length;
638-
if (outputLength > 0) {
639-
var output = this.output;
640-
var outputEncodings = this.outputEncodings;
641-
var outputCallbacks = this.outputCallbacks;
642-
socket.cork();
643-
for (var i = 0; i < outputLength; i++) {
644-
ret = socket.write(output[i], outputEncodings[i],
645-
outputCallbacks[i]);
646-
}
647-
socket.uncork();
648-
649-
this.output = [];
650-
this.outputEncodings = [];
651-
this.outputCallbacks = [];
652-
}
622+
ret = this._flushOutput(socket);
653623

654624
if (this.finished) {
655625
// This is a queue to the server or client to bring in the next this.
@@ -661,6 +631,32 @@ OutgoingMessage.prototype._flush = function() {
661631
}
662632
};
663633

634+
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
635+
var ret;
636+
var outputLength = this.output.length;
637+
if (outputLength <= 0)
638+
return ret;
639+
640+
var output = this.output;
641+
var outputEncodings = this.outputEncodings;
642+
var outputCallbacks = this.outputCallbacks;
643+
socket.cork();
644+
for (var i = 0; i < outputLength; i++) {
645+
ret = socket.write(output[i], outputEncodings[i],
646+
outputCallbacks[i]);
647+
}
648+
socket.uncork();
649+
650+
this.output = [];
651+
this.outputEncodings = [];
652+
this.outputCallbacks = [];
653+
if (typeof this._onPendingData === 'function')
654+
this._onPendingData(-this.outputSize);
655+
this.outputSize = 0;
656+
657+
return ret;
658+
};
659+
664660

665661
OutgoingMessage.prototype.flushHeaders = function() {
666662
if (!this._header) {
Collapse file

‎lib/_http_server.js‎

Copy file name to clipboardExpand all lines: lib/_http_server.js
+33-6Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,10 @@ function connectionListener(socket) {
343343
socket.on = socketOnWrap;
344344

345345
var external = socket._handle._externalStream;
346-
if (external)
346+
if (external) {
347+
parser._consumed = true;
347348
parser.consume(external);
349+
}
348350
external = null;
349351
parser[kOnExecute] = onParserExecute;
350352

@@ -382,7 +384,7 @@ function connectionListener(socket) {
382384
socket.removeListener('data', socketOnData);
383385
socket.removeListener('end', socketOnEnd);
384386
socket.removeListener('close', serverSocketCloseListener);
385-
parser.unconsume(socket._handle._externalStream);
387+
unconsume(parser, socket);
386388
parser.finish();
387389
freeParser(parser, req, null);
388390
parser = null;
@@ -530,13 +532,38 @@ function connectionListener(socket) {
530532
exports._connectionListener = connectionListener;
531533

532534
function onSocketResume() {
533-
if (this._handle)
535+
// It may seem that the socket is resumed, but this is an enemy's trick to
536+
// deceive us! `resume` is emitted asynchronously, and may be called from
537+
// `incoming.readStart()`. Stop the socket again here, just to preserve the
538+
// state.
539+
//
540+
// We don't care about stream semantics for the consumed socket anyway.
541+
if (this._paused) {
542+
this.pause();
543+
return;
544+
}
545+
546+
if (this._handle && !this._handle.reading) {
547+
this._handle.reading = true;
534548
this._handle.readStart();
549+
}
535550
}
536551

537552
function onSocketPause() {
538-
if (this._handle)
553+
if (this._handle && this._handle.reading) {
554+
this._handle.reading = false;
539555
this._handle.readStop();
556+
}
557+
}
558+
559+
function unconsume(parser, socket) {
560+
if (socket._handle) {
561+
if (parser._consumed)
562+
parser.unconsume(socket._handle._externalStream);
563+
parser._consumed = false;
564+
socket.removeListener('pause', onSocketPause);
565+
socket.removeListener('resume', onSocketResume);
566+
}
540567
}
541568

542569
function socketOnWrap(ev, fn) {
@@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
546573
return res;
547574
}
548575

549-
if (this._handle && (ev === 'data' || ev === 'readable'))
550-
this.parser.unconsume(this._handle._externalStream);
576+
if (ev === 'data' || ev === 'readable')
577+
unconsume(this.parser, this);
551578

552579
return res;
553580
}
Collapse file

‎src/node_http_parser.cc‎

Copy file name to clipboardExpand all lines: src/node_http_parser.cc
+11-6Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -484,13 +484,18 @@ class Parser : public BaseObject {
484484
if (parser->prev_alloc_cb_.is_empty())
485485
return;
486486

487-
CHECK(args[0]->IsExternal());
488-
Local<External> stream_obj = args[0].As<External>();
489-
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
490-
CHECK_NE(stream, nullptr);
487+
// Restore stream's callbacks
488+
if (args.Length() == 1 && args[0]->IsExternal()) {
489+
Local<External> stream_obj = args[0].As<External>();
490+
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
491+
CHECK_NE(stream, nullptr);
492+
493+
stream->set_alloc_cb(parser->prev_alloc_cb_);
494+
stream->set_read_cb(parser->prev_read_cb_);
495+
}
491496

492-
stream->set_alloc_cb(parser->prev_alloc_cb_);
493-
stream->set_read_cb(parser->prev_read_cb_);
497+
parser->prev_alloc_cb_.clear();
498+
parser->prev_read_cb_.clear();
494499
}
495500

496501

Collapse file
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const http = require('http');
5+
const net = require('net');
6+
7+
const big = new Buffer(16 * 1024);
8+
big.fill('A');
9+
10+
const COUNT = 1e4;
11+
12+
var received = 0;
13+
14+
var client;
15+
const server = http.createServer(function(req, res) {
16+
res.end(big, function() {
17+
if (++received === COUNT) {
18+
server.close();
19+
client.end();
20+
}
21+
});
22+
}).listen(common.PORT, function() {
23+
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
24+
client = net.connect(common.PORT, function() {
25+
client.write(req);
26+
});
27+
28+
// Just let the test terminate instead of hanging
29+
client.on('close', function() {
30+
if (received !== COUNT)
31+
server.close();
32+
});
33+
client.resume();
34+
});
35+
36+
process.on('exit', function() {
37+
// The server should pause connection on pipeline flood, but it shoul still
38+
// resume it and finish processing the requests, when its output queue will
39+
// be empty again.
40+
assert.equal(received, COUNT);
41+
});

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.