Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4e3f6f3

Browse filesBrowse files
committed
stream: cleanup and fix Readable.wrap
Cleans up Readable.wrap and also ensures destroy is called for certain events. PR-URL: #34204 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 3751662 commit 4e3f6f3
Copy full SHA for 4e3f6f3

File tree

Expand file treeCollapse file tree

3 files changed

+56
-52
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+56
-52
lines changed
Open diff view settings
Collapse file

‎lib/_stream_readable.js‎

Copy file name to clipboardExpand all lines: lib/_stream_readable.js
+19-52Lines changed: 19 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const {
2626
NumberIsInteger,
2727
NumberIsNaN,
2828
ObjectDefineProperties,
29+
ObjectKeys,
2930
ObjectSetPrototypeOf,
3031
Set,
3132
SymbolAsyncIterator,
@@ -1007,83 +1008,49 @@ function flow(stream) {
10071008
// This is *not* part of the readable stream interface.
10081009
// It is an ugly unfortunate mess of history.
10091010
Readable.prototype.wrap = function(stream) {
1010-
const state = this._readableState;
10111011
let paused = false;
10121012

1013-
stream.on('end', () => {
1014-
debug('wrapped end');
1015-
if (state.decoder && !state.ended) {
1016-
const chunk = state.decoder.end();
1017-
if (chunk && chunk.length)
1018-
this.push(chunk);
1019-
}
1020-
1021-
this.push(null);
1022-
});
1013+
// TODO (ronag): Should this.destroy(err) emit
1014+
// 'error' on the wrapped stream? Would require
1015+
// a static factory method, e.g. Readable.wrap(stream).
10231016

10241017
stream.on('data', (chunk) => {
1025-
debug('wrapped data');
1026-
if (state.decoder)
1027-
chunk = state.decoder.write(chunk);
1028-
1029-
// Don't skip over falsy values in objectMode.
1030-
if (state.objectMode && (chunk === null || chunk === undefined))
1031-
return;
1032-
else if (!state.objectMode && (!chunk || !chunk.length))
1033-
return;
1034-
1035-
const ret = this.push(chunk);
1036-
if (!ret) {
1018+
if (!this.push(chunk) && stream.pause) {
10371019
paused = true;
10381020
stream.pause();
10391021
}
10401022
});
10411023

1042-
// Proxy all the other methods. Important when wrapping filters and duplexes.
1043-
for (const i in stream) {
1044-
if (this[i] === undefined && typeof stream[i] === 'function') {
1045-
this[i] = function methodWrap(method) {
1046-
return function methodWrapReturnFunction() {
1047-
return stream[method].apply(stream, arguments);
1048-
};
1049-
}(i);
1050-
}
1051-
}
1024+
stream.on('end', () => {
1025+
this.push(null);
1026+
});
10521027

10531028
stream.on('error', (err) => {
10541029
errorOrDestroy(this, err);
10551030
});
10561031

10571032
stream.on('close', () => {
1058-
// TODO(ronag): Update readable state?
1059-
this.emit('close');
1033+
this.destroy();
10601034
});
10611035

10621036
stream.on('destroy', () => {
1063-
// TODO(ronag): this.destroy()?
1064-
this.emit('destroy');
1037+
this.destroy();
10651038
});
10661039

1067-
stream.on('pause', () => {
1068-
// TODO(ronag): this.pause()?
1069-
this.emit('pause');
1070-
});
1071-
1072-
stream.on('resume', () => {
1073-
// TODO(ronag): this.resume()?
1074-
this.emit('resume');
1075-
});
1076-
1077-
// When we try to consume some more bytes, simply unpause the
1078-
// underlying stream.
1079-
this._read = (n) => {
1080-
debug('wrapped _read', n);
1081-
if (paused) {
1040+
this._read = () => {
1041+
if (paused && stream.resume) {
10821042
paused = false;
10831043
stream.resume();
10841044
}
10851045
};
10861046

1047+
// Proxy all the other methods. Important when wrapping filters and duplexes.
1048+
for (const i of ObjectKeys(stream)) {
1049+
if (this[i] === undefined && typeof stream[i] === 'function') {
1050+
this[i] = stream[i].bind(stream);
1051+
}
1052+
}
1053+
10871054
return this;
10881055
};
10891056

Collapse file
+27Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
const common = require('../common');
3+
4+
const Readable = require('_stream_readable');
5+
const EE = require('events').EventEmitter;
6+
7+
const oldStream = new EE();
8+
oldStream.pause = () => {};
9+
oldStream.resume = () => {};
10+
11+
{
12+
new Readable({
13+
autoDestroy: false,
14+
destroy: common.mustCall()
15+
})
16+
.wrap(oldStream);
17+
oldStream.emit('destroy');
18+
}
19+
20+
{
21+
new Readable({
22+
autoDestroy: false,
23+
destroy: common.mustCall()
24+
})
25+
.wrap(oldStream);
26+
oldStream.emit('close');
27+
}
Collapse file

‎test/parallel/test-stream2-readable-wrap.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream2-readable-wrap.js
+10Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) {
4444
flow();
4545
};
4646

47+
// Make sure pause is only emitted once.
48+
let pausing = false;
49+
r.on('pause', () => {
50+
assert.strictEqual(pausing, false);
51+
pausing = true;
52+
process.nextTick(() => {
53+
pausing = false;
54+
});
55+
});
56+
4757
let flowing;
4858
let chunks = 10;
4959
let oldEnded = false;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.