Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f4609bd

Browse filesBrowse files
ronagdnlup
authored andcommitted
stream: bypass legacy destroy for pipeline and async iteration
PR-URL: #38505 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent c0becbc commit f4609bd
Copy full SHA for f4609bd

File tree

Expand file treeCollapse file tree

6 files changed

+188
-12
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+188
-12
lines changed
Open diff view settings
Collapse file

‎lib/_http_client.js‎

Copy file name to clipboardExpand all lines: lib/_http_client.js
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const {
5353
prepareError,
5454
} = require('_http_common');
5555
const { OutgoingMessage } = require('_http_outgoing');
56+
const { kDestroy } = require('internal/streams/destroy');
5657
const Agent = require('_http_agent');
5758
const { Buffer } = require('buffer');
5859
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
609610
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
610611
req.res = res;
611612
res.req = req;
613+
res[kDestroy] = null;
612614

613615
// Add our listener first, so that we guarantee socket cleanup
614616
res.on('end', responseOnEnd);
Collapse file

‎lib/_http_incoming.js‎

Copy file name to clipboardExpand all lines: lib/_http_incoming.js
+8-2Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131
} = primordials;
3232

3333
const { Readable, finished } = require('stream');
34+
const { kDestroy } = require('internal/streams/destroy');
3435

3536
const kHeaders = Symbol('kHeaders');
3637
const kHeadersCount = Symbol('kHeadersCount');
@@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
188189
this.socket.destroy(err);
189190
const cleanup = finished(this.socket, (e) => {
190191
cleanup();
191-
onError(this, e || err, cb);
192+
process.nextTick(onError, this, e || err, cb);
192193
});
193194
} else {
194-
onError(this, err, cb);
195+
process.nextTick(onError, this, err, cb);
195196
}
196197
};
197198

199+
IncomingMessage.prototype[kDestroy] = function(err) {
200+
this.socket = null;
201+
this.destroy(err);
202+
};
203+
198204
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
199205
function _addHeaderLines(headers, n) {
200206
if (headers && headers.length) {
Collapse file

‎lib/_http_server.js‎

Copy file name to clipboardExpand all lines: lib/_http_server.js
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,7 @@ function onServerResponseClose() {
231231
// where the ServerResponse object has already been deconstructed.
232232
// Fortunately, that requires only a single if check. :-)
233233
if (this._httpMessage) {
234-
this._httpMessage.destroyed = true;
235-
this._httpMessage._closed = true;
236-
this._httpMessage.emit('close');
234+
emitCloseNT(this._httpMessage);
237235
}
238236
}
239237

@@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
837835
}
838836

839837
function emitCloseNT(self) {
840-
self.destroyed = true;
841-
self._closed = true;
842-
self.emit('close');
838+
if (!self.destroyed) {
839+
self.destroyed = true;
840+
self._closed = true;
841+
self.emit('close');
842+
}
843843
}
844844

845845
// The following callback is issued after the headers have been read on a
Collapse file

‎lib/internal/streams/destroy.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/destroy.js
+55-4Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const {
55
codes: {
66
ERR_MULTIPLE_CALLBACK,
77
},
8+
AbortError,
89
} = require('internal/errors');
910
const {
1011
Symbol,
@@ -363,15 +364,65 @@ function isRequest(stream) {
363364
return stream && stream.setHeader && typeof stream.abort === 'function';
364365
}
365366

367+
const kDestroyed = Symbol('kDestroyed');
368+
369+
function emitCloseLegacy(stream) {
370+
stream.emit('close');
371+
}
372+
373+
function emitErrorCloseLegacy(stream, err) {
374+
stream.emit('error', err);
375+
process.nextTick(emitCloseLegacy, stream);
376+
}
377+
378+
function isDestroyed(stream) {
379+
return stream.destroyed || stream[kDestroyed];
380+
}
381+
382+
function isReadable(stream) {
383+
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
384+
}
385+
386+
function isWritable(stream) {
387+
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
388+
}
389+
366390
// Normalize destroy for legacy.
367391
function destroyer(stream, err) {
368-
if (isRequest(stream)) return stream.abort();
369-
if (isRequest(stream.req)) return stream.req.abort();
370-
if (typeof stream.destroy === 'function') return stream.destroy(err);
371-
if (typeof stream.close === 'function') return stream.close();
392+
if (isDestroyed(stream)) {
393+
return;
394+
}
395+
396+
if (!err && (isReadable(stream) || isWritable(stream))) {
397+
err = new AbortError();
398+
}
399+
400+
// TODO: Remove isRequest branches.
401+
if (typeof stream[kDestroy] === 'function') {
402+
stream[kDestroy](err);
403+
} else if (isRequest(stream)) {
404+
stream.abort();
405+
} else if (isRequest(stream.req)) {
406+
stream.req.abort();
407+
} else if (typeof stream.destroy === 'function') {
408+
stream.destroy(err);
409+
} else if (typeof stream.close === 'function') {
410+
// TODO: Don't lose err?
411+
stream.close();
412+
} else if (err) {
413+
process.nextTick(emitErrorCloseLegacy, stream);
414+
} else {
415+
process.nextTick(emitCloseLegacy, stream);
416+
}
417+
418+
if (!stream.destroyed) {
419+
stream[kDestroyed] = true;
420+
}
372421
}
373422

374423
module.exports = {
424+
kDestroy,
425+
isDestroyed,
375426
construct,
376427
destroyer,
377428
destroy,
Collapse file

‎lib/stream.js‎

Copy file name to clipboardExpand all lines: lib/stream.js
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33+
const { destroyer } = require('internal/streams/destroy');
3334
const eos = require('internal/streams/end-of-stream');
3435
const internalBuffer = require('internal/buffer');
3536

@@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
4546
const { addAbortSignal } = require('internal/streams/add-abort-signal');
4647
Stream.addAbortSignal = addAbortSignal;
4748
Stream.finished = eos;
49+
Stream.destroy = destroyer;
4850

4951
ObjectDefineProperty(Stream, 'promises', {
5052
configurable: true,
Collapse file
+115Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Writable,
6+
Readable,
7+
destroy
8+
} = require('stream');
9+
const assert = require('assert');
10+
const http = require('http');
11+
12+
{
13+
const r = new Readable({ read() {} });
14+
destroy(r);
15+
assert.strictEqual(r.destroyed, true);
16+
r.on('error', common.mustCall((err) => {
17+
assert.strictEqual(err.name, 'AbortError');
18+
}));
19+
r.on('close', common.mustCall());
20+
}
21+
22+
{
23+
const r = new Readable({ read() {} });
24+
destroy(r, new Error('asd'));
25+
assert.strictEqual(r.destroyed, true);
26+
r.on('error', common.mustCall((err) => {
27+
assert.strictEqual(err.message, 'asd');
28+
}));
29+
r.on('close', common.mustCall());
30+
}
31+
32+
{
33+
const w = new Writable({ write() {} });
34+
destroy(w);
35+
assert.strictEqual(w.destroyed, true);
36+
w.on('error', common.mustCall((err) => {
37+
assert.strictEqual(err.name, 'AbortError');
38+
}));
39+
w.on('close', common.mustCall());
40+
}
41+
42+
{
43+
const w = new Writable({ write() {} });
44+
destroy(w, new Error('asd'));
45+
assert.strictEqual(w.destroyed, true);
46+
w.on('error', common.mustCall((err) => {
47+
assert.strictEqual(err.message, 'asd');
48+
}));
49+
w.on('close', common.mustCall());
50+
}
51+
52+
{
53+
const server = http.createServer((req, res) => {
54+
destroy(req);
55+
req.on('error', common.mustCall((err) => {
56+
assert.strictEqual(err.name, 'AbortError');
57+
}));
58+
req.on('close', common.mustCall(() => {
59+
res.end('hello');
60+
}));
61+
});
62+
63+
server.listen(0, () => {
64+
const req = http.request({
65+
port: server.address().port
66+
});
67+
68+
req.write('asd');
69+
req.on('response', (res) => {
70+
const buf = [];
71+
res.on('data', (data) => buf.push(data));
72+
res.on('end', common.mustCall(() => {
73+
assert.deepStrictEqual(
74+
Buffer.concat(buf),
75+
Buffer.from('hello')
76+
);
77+
server.close();
78+
}));
79+
});
80+
});
81+
}
82+
83+
{
84+
const server = http.createServer((req, res) => {
85+
req
86+
.resume()
87+
.on('end', () => {
88+
destroy(req);
89+
})
90+
.on('error', common.mustNotCall());
91+
92+
req.on('close', common.mustCall(() => {
93+
res.end('hello');
94+
}));
95+
});
96+
97+
server.listen(0, () => {
98+
const req = http.request({
99+
port: server.address().port
100+
});
101+
102+
req.write('asd');
103+
req.on('response', (res) => {
104+
const buf = [];
105+
res.on('data', (data) => buf.push(data));
106+
res.on('end', common.mustCall(() => {
107+
assert.deepStrictEqual(
108+
Buffer.concat(buf),
109+
Buffer.from('hello')
110+
);
111+
server.close();
112+
}));
113+
});
114+
});
115+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.