Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0bd5595

Browse filesBrowse files
committed
stream: simplify Transform stream implementation
Significantly simplified Transform stream implementation by using mostly standard stream code. PR-URL: #32763 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent f22a9ca commit 0bd5595
Copy full SHA for 0bd5595

File tree

Expand file treeCollapse file tree

4 files changed

+56
-114
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+56
-114
lines changed
Open diff view settings
Collapse file

‎lib/_stream_transform.js‎

Copy file name to clipboardExpand all lines: lib/_stream_transform.js
+52-108Lines changed: 52 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -65,66 +65,32 @@
6565

6666
const {
6767
ObjectSetPrototypeOf,
68+
Symbol
6869
} = primordials;
6970

7071
module.exports = Transform;
7172
const {
72-
ERR_METHOD_NOT_IMPLEMENTED,
73-
ERR_MULTIPLE_CALLBACK,
74-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
75-
ERR_TRANSFORM_WITH_LENGTH_0
73+
ERR_METHOD_NOT_IMPLEMENTED
7674
} = require('internal/errors').codes;
7775
const Duplex = require('_stream_duplex');
7876
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
7977
ObjectSetPrototypeOf(Transform, Duplex);
8078

81-
82-
function afterTransform(er, data) {
83-
const ts = this._transformState;
84-
ts.transforming = false;
85-
86-
const cb = ts.writecb;
87-
88-
if (cb === null) {
89-
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
90-
}
91-
92-
ts.writechunk = null;
93-
ts.writecb = null;
94-
95-
if (data != null) // Single equals check for both `null` and `undefined`
96-
this.push(data);
97-
98-
cb(er);
99-
100-
const rs = this._readableState;
101-
rs.reading = false;
102-
if (rs.needReadable || rs.length < rs.highWaterMark) {
103-
this._read(rs.highWaterMark);
104-
}
105-
}
106-
79+
const kCallback = Symbol('kCallback');
10780

10881
function Transform(options) {
10982
if (!(this instanceof Transform))
11083
return new Transform(options);
11184

11285
Duplex.call(this, options);
11386

114-
this._transformState = {
115-
afterTransform: afterTransform.bind(this),
116-
needTransform: false,
117-
transforming: false,
118-
writecb: null,
119-
writechunk: null,
120-
writeencoding: null
121-
};
122-
12387
// We have implemented the _read method, and done the other things
12488
// that Readable wants before the first _read call, so unset the
12589
// sync guard flag.
12690
this._readableState.sync = false;
12791

92+
this[kCallback] = null;
93+
12894
if (options) {
12995
if (typeof options.transform === 'function')
13096
this._transform = options.transform;
@@ -133,89 +99,67 @@ function Transform(options) {
13399
this._flush = options.flush;
134100
}
135101

136-
// When the writable side finishes, then flush out anything remaining.
102+
// TODO(ronag): Unfortunately _final is invoked asynchronously.
103+
// Use `prefinish` hack. `prefinish` is emitted synchronously when
104+
// and only when `_final` is not defined. Implementing `_final`
105+
// to a Transform should be an error.
137106
this.on('prefinish', prefinish);
138107
}
139108

140109
function prefinish() {
141-
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
110+
if (typeof this._flush === 'function' && !this.destroyed) {
142111
this._flush((er, data) => {
143-
done(this, er, data);
112+
if (er) {
113+
this.destroy(er);
114+
return;
115+
}
116+
117+
if (data != null) {
118+
this.push(data);
119+
}
120+
this.push(null);
144121
});
145122
} else {
146-
done(this, null, null);
123+
this.push(null);
147124
}
148125
}
149126

150-
Transform.prototype.push = function(chunk, encoding) {
151-
this._transformState.needTransform = false;
152-
return Duplex.prototype.push.call(this, chunk, encoding);
153-
};
154-
155-
// This is the part where you do stuff!
156-
// override this function in implementation classes.
157-
// 'chunk' is an input chunk.
158-
//
159-
// Call `push(newChunk)` to pass along transformed output
160-
// to the readable side. You may call 'push' zero or more times.
161-
//
162-
// Call `cb(err)` when you are done with this chunk. If you pass
163-
// an error, then that'll put the hurt on the whole operation. If you
164-
// never call cb(), then you'll never get another chunk.
165-
Transform.prototype._transform = function(chunk, encoding, cb) {
127+
Transform.prototype._transform = function(chunk, encoding, callback) {
166128
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
167129
};
168130

169-
Transform.prototype._write = function(chunk, encoding, cb) {
170-
const ts = this._transformState;
171-
ts.writecb = cb;
172-
ts.writechunk = chunk;
173-
ts.writeencoding = encoding;
174-
if (!ts.transforming) {
175-
const rs = this._readableState;
176-
if (ts.needTransform ||
177-
rs.needReadable ||
178-
rs.length < rs.highWaterMark)
179-
this._read(rs.highWaterMark);
180-
}
131+
Transform.prototype._write = function(chunk, encoding, callback) {
132+
const rState = this._readableState;
133+
const wState = this._writableState;
134+
const length = rState.length;
135+
136+
this._transform(chunk, encoding, (err, val) => {
137+
if (err) {
138+
callback(err);
139+
return;
140+
}
141+
142+
if (val != null) {
143+
this.push(val);
144+
}
145+
146+
if (
147+
wState.ended || // Backwards compat.
148+
length === rState.length || // Backwards compat.
149+
rState.length < rState.highWaterMark ||
150+
rState.length === 0
151+
) {
152+
callback();
153+
} else {
154+
this[kCallback] = callback;
155+
}
156+
});
181157
};
182158

183-
// Doesn't matter what the args are here.
184-
// _transform does all the work.
185-
// That we got here means that the readable side wants more data.
186-
Transform.prototype._read = function(n) {
187-
const ts = this._transformState;
188-
189-
if (ts.writechunk !== null && !ts.transforming) {
190-
ts.transforming = true;
191-
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
192-
} else {
193-
// Mark that we need a transform, so that any data that comes in
194-
// will get processed, now that we've asked for it.
195-
ts.needTransform = true;
159+
Transform.prototype._read = function() {
160+
if (this[kCallback]) {
161+
const callback = this[kCallback];
162+
this[kCallback] = null;
163+
callback();
196164
}
197165
};
198-
199-
200-
Transform.prototype._destroy = function(err, cb) {
201-
Duplex.prototype._destroy.call(this, err, (err2) => {
202-
cb(err2);
203-
});
204-
};
205-
206-
207-
function done(stream, er, data) {
208-
if (er)
209-
return stream.emit('error', er);
210-
211-
if (data != null) // Single equals check for both `null` and `undefined`
212-
stream.push(data);
213-
214-
// These two error cases are coherence checks that can likely not be tested.
215-
if (stream._writableState.length)
216-
throw new ERR_TRANSFORM_WITH_LENGTH_0();
217-
218-
if (stream._transformState.transforming)
219-
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
220-
return stream.push(null);
221-
}
Collapse file

‎lib/internal/errors.js‎

Copy file name to clipboardExpand all lines: lib/internal/errors.js
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
13631363
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
13641364
'At least one category is required', TypeError);
13651365
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
1366-
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
1367-
'Calling transform done when still transforming', Error);
13681366

13691367
// This should probably be a `RangeError`.
1370-
E('ERR_TRANSFORM_WITH_LENGTH_0',
1371-
'Calling transform done when writableState.length != 0', Error);
13721368
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
13731369
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
13741370
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +
Collapse file

‎test/parallel/test-stream2-transform.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream2-transform.js
+1-2Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ const Transform = require('_stream_transform');
4545

4646
assert.strictEqual(tx.readableLength, 10);
4747
assert.strictEqual(transformed, 10);
48-
assert.strictEqual(tx._transformState.writechunk.length, 5);
4948
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
5049
return c.chunk.length;
51-
}), [6, 7, 8, 9, 10]);
50+
}), [5, 6, 7, 8, 9, 10]);
5251
}
5352

5453
{
Collapse file

‎test/parallel/test-zlib-flush-drain.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-zlib-flush-drain.js
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const ws = deflater._writableState;
2828
const beforeFlush = ws.needDrain;
2929
let afterFlush = ws.needDrain;
3030

31+
deflater.on('data', () => {
32+
});
33+
3134
deflater.flush(function(err) {
3235
afterFlush = ws.needDrain;
3336
});

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.