Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8f86986

Browse filesBrowse files
committed
stream: use callback to properly propagate error
The stream will be destroyed upstream through the proper error flow. PR-URL: #29179 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent b9da063 commit 8f86986
Copy full SHA for 8f86986

File tree

Expand file treeCollapse file tree

7 files changed

+95
-15
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+95
-15
lines changed
Open diff view settings
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+9-6Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) {
144144
// Has it been destroyed
145145
this.destroyed = false;
146146

147-
// Indicates whether the stream has errored.
147+
// Indicates whether the stream has errored. When true no further
148+
// _read calls, 'data' or 'readable' events should occur. This is needed
149+
// since when autoDestroy is disabled we need a way to tell whether the
150+
// stream has failed.
148151
this.errored = false;
149152

150153
// Indicates whether the stream has finished destroying.
@@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
258261
addChunk(stream, state, chunk, true);
259262
} else if (state.ended) {
260263
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
261-
} else if (state.destroyed) {
264+
} else if (state.destroyed || state.errored) {
262265
return false;
263266
} else {
264267
state.reading = false;
@@ -453,9 +456,9 @@ Readable.prototype.read = function(n) {
453456
}
454457

455458
// However, if we've ended, then there's no point, if we're already
456-
// reading, then it's unnecessary, and if we're destroyed, then it's
457-
// not allowed.
458-
if (state.ended || state.reading || state.destroyed) {
459+
// reading, then it's unnecessary, and if we're destroyed or errored,
460+
// then it's not allowed.
461+
if (state.ended || state.reading || state.destroyed || state.errored) {
459462
doRead = false;
460463
debug('reading or ended', doRead);
461464
} else if (doRead) {
@@ -553,7 +556,7 @@ function emitReadable(stream) {
553556
function emitReadable_(stream) {
554557
const state = stream._readableState;
555558
debug('emitReadable_', state.destroyed, state.length, state.ended);
556-
if (!state.destroyed && (state.length || state.ended)) {
559+
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
557560
stream.emit('readable');
558561
state.emittedReadable = false;
559562
}
Collapse file

‎lib/_stream_writable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_writable.js
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,13 @@ function onwrite(stream, er) {
416416

417417
if (er) {
418418
state.errored = true;
419+
420+
// In case of duplex streams we need to notify the readable side of the
421+
// error.
422+
if (stream._readableState) {
423+
stream._readableState.errored = true;
424+
}
425+
419426
if (sync) {
420427
process.nextTick(onwriteError, stream, state, er, cb);
421428
} else {
Collapse file

‎lib/internal/http2/core.js‎

Copy file name to clipboardExpand all lines: lib/internal/http2/core.js
+11-2Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex {
19951995

19961996
let req;
19971997

1998+
// writeGeneric does not destroy on error and we cannot enable autoDestroy,
1999+
// so make sure to destroy on error.
2000+
const callback = (err) => {
2001+
if (err) {
2002+
this.destroy(err);
2003+
}
2004+
cb(err);
2005+
};
2006+
19982007
if (writev)
1999-
req = writevGeneric(this, data, cb);
2008+
req = writevGeneric(this, data, callback);
20002009
else
2001-
req = writeGeneric(this, data, encoding, cb);
2010+
req = writeGeneric(this, data, encoding, callback);
20022011

20032012
trackWriteState(this, req.bytes);
20042013
}
Collapse file

‎lib/internal/stream_base_commons.js‎

Copy file name to clipboardExpand all lines: lib/internal/stream_base_commons.js
+10-6Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,14 @@ function onWriteComplete(status) {
8888
return;
8989
}
9090

91+
// TODO (ronag): This should be moved before if(stream.destroyed)
92+
// in order to avoid swallowing error.
9193
if (status < 0) {
9294
const ex = errnoException(status, 'write', this.error);
93-
stream.destroy(ex, this.callback);
95+
if (typeof this.callback === 'function')
96+
this.callback(ex);
97+
else
98+
stream.destroy(ex);
9499
return;
95100
}
96101

@@ -134,24 +139,24 @@ function writevGeneric(self, data, cb) {
134139
// Retain chunks
135140
if (err === 0) req._chunks = chunks;
136141

137-
afterWriteDispatched(self, req, err, cb);
142+
afterWriteDispatched(req, err, cb);
138143
return req;
139144
}
140145

141146
function writeGeneric(self, data, encoding, cb) {
142147
const req = createWriteWrap(self[kHandle]);
143148
const err = handleWriteReq(req, data, encoding);
144149

145-
afterWriteDispatched(self, req, err, cb);
150+
afterWriteDispatched(req, err, cb);
146151
return req;
147152
}
148153

149-
function afterWriteDispatched(self, req, err, cb) {
154+
function afterWriteDispatched(req, err, cb) {
150155
req.bytes = streamBaseState[kBytesWritten];
151156
req.async = !!streamBaseState[kLastWriteWasAsync];
152157

153158
if (err !== 0)
154-
return self.destroy(errnoException(err, 'write', req.error), cb);
159+
return cb(errnoException(err, 'write', req.error));
155160

156161
if (!req.async) {
157162
cb();
@@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) {
264269
}
265270

266271
module.exports = {
267-
createWriteWrap,
268272
writevGeneric,
269273
writeGeneric,
270274
onStreamRead,
Collapse file
+56Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const net = require('net');
5+
6+
const tcp = net.Server(common.mustCall((s) => {
7+
tcp.close();
8+
9+
let buf = '';
10+
s.setEncoding('utf8');
11+
s.on('data', function(d) {
12+
buf += d;
13+
});
14+
15+
s.on('end', common.mustCall(function() {
16+
console.error('SERVER: end', buf);
17+
assert.strictEqual(buf, "L'État, c'est moi");
18+
s.end();
19+
}));
20+
}));
21+
22+
tcp.listen(0, common.mustCall(function() {
23+
const socket = net.Stream({ highWaterMark: 0 });
24+
25+
let connected = false;
26+
assert.strictEqual(socket.pending, true);
27+
socket.connect(this.address().port, common.mustCall(() => connected = true));
28+
29+
assert.strictEqual(socket.pending, true);
30+
assert.strictEqual(socket.connecting, true);
31+
assert.strictEqual(socket.readyState, 'opening');
32+
33+
// Write a string that contains a multi-byte character sequence to test that
34+
// `bytesWritten` is incremented with the # of bytes, not # of characters.
35+
const a = "L'État, c'est ";
36+
const b = 'moi';
37+
38+
// We're still connecting at this point so the datagram is first pushed onto
39+
// the connect queue. Make sure that it's not added to `bytesWritten` again
40+
// when the actual write happens.
41+
const r = socket.write(a, common.mustCall((er) => {
42+
console.error('write cb');
43+
assert.ok(connected);
44+
assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length);
45+
assert.strictEqual(socket.pending, false);
46+
}));
47+
socket.on('close', common.mustCall(() => {
48+
assert.strictEqual(socket.pending, true);
49+
}));
50+
51+
assert.strictEqual(socket.bytesWritten, Buffer.from(a).length);
52+
assert.strictEqual(r, false);
53+
socket.end(b);
54+
55+
assert.strictEqual(socket.readyState, 'opening');
56+
}));
Collapse file

‎test/parallel/test-net-write-arguments.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-net-write-arguments.js
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ assert.throws(() => {
2525
[],
2626
{}
2727
].forEach((value) => {
28+
const socket = net.Stream({ highWaterMark: 0 });
2829
// We need to check the callback since 'error' will only
2930
// be emitted once per instance.
3031
assert.throws(() => {
Collapse file

‎test/parallel/test-wrap-js-stream-exceptions.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-wrap-js-stream-exceptions.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => {
1010
}));
1111

1212
const socket = new JSStreamWrap(new Duplex({
13-
read: common.mustNotCall(),
13+
read: common.mustCall(),
1414
write: common.mustCall((buffer, data, cb) => {
1515
throw new Error('exception!');
1616
})

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.