Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e9bda11

Browse filesBrowse files
authored
stream: lazy allocate back pressure buffer
PR-URL: #50013 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
1 parent 19b470f commit e9bda11
Copy full SHA for e9bda11

File tree

Expand file treeCollapse file tree

1 file changed

+41
-23
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

1 file changed

+41
-23
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/writable.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/writable.js
+41-23Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue');
7777
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
7878
const kWriteCbValue = Symbol('kWriteCbValue');
7979
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
80+
const kBufferedValue = Symbol('kBufferedValue');
8081

8182
const kObjectMode = 1 << 0;
8283
const kEnded = 1 << 1;
@@ -108,7 +109,7 @@ const kWriteCb = 1 << 26;
108109
const kExpectWriteCb = 1 << 27;
109110
const kAfterWriteTickInfo = 1 << 28;
110111
const kAfterWritePending = 1 << 29;
111-
const kHasBuffer = 1 << 30;
112+
const kBuffered = 1 << 30;
112113

113114
// TODO(benjamingr) it is likely slower to do it this way than with free functions
114115
function makeBitMapDescriptor(bit) {
@@ -270,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, {
270271
}
271272
},
272273
},
274+
275+
buffered: {
276+
__proto__: null,
277+
enumerable: false,
278+
get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; },
279+
set(value) {
280+
this[kBufferedValue] = value;
281+
if (value) {
282+
this.state |= kBuffered;
283+
} else {
284+
this.state &= ~kBuffered;
285+
}
286+
},
287+
},
288+
273289
});
274290

275291
function WritableState(options, stream, isDuplex) {
@@ -338,20 +354,20 @@ function WritableState(options, stream, isDuplex) {
338354
}
339355

340356
function resetBuffer(state) {
341-
state.buffered = [];
357+
state[kBufferedValue] = null;
342358
state.bufferedIndex = 0;
343359
state.state |= kAllBuffers | kAllNoop;
344-
state.state &= ~kHasBuffer;
360+
state.state &= ~kBuffered;
345361
}
346362

347363
WritableState.prototype.getBuffer = function getBuffer() {
348-
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
364+
return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
349365
};
350366

351367
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
352368
__proto__: null,
353369
get() {
354-
return this.buffered.length - this.bufferedIndex;
370+
return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex;
355371
},
356372
});
357373

@@ -518,8 +534,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
518534
state.length += len;
519535

520536
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
521-
state.buffered.push({ chunk, encoding, callback });
522-
state.state |= kHasBuffer;
537+
if ((state.state & kBuffered) === 0) {
538+
state.state |= kBuffered;
539+
state[kBufferedValue] = [];
540+
}
541+
542+
state[kBufferedValue].push({ chunk, encoding, callback });
523543
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
524544
state.state &= ~kAllBuffers;
525545
}
@@ -611,7 +631,7 @@ function onwrite(stream, er) {
611631
onwriteError(stream, state, er, cb);
612632
}
613633
} else {
614-
if ((state.state & kHasBuffer) !== 0) {
634+
if ((state.state & kBuffered) !== 0) {
615635
clearBuffer(stream, state);
616636
}
617637

@@ -687,11 +707,13 @@ function errorBuffer(state) {
687707
return;
688708
}
689709

690-
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
691-
const { chunk, callback } = state.buffered[n];
692-
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
693-
state.length -= len;
694-
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
710+
if ((state.state & kBuffered) !== 0) {
711+
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
712+
const { chunk, callback } = state[kBufferedValue][n];
713+
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
714+
state.length -= len;
715+
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
716+
}
695717
}
696718

697719

@@ -702,13 +724,12 @@ function errorBuffer(state) {
702724

703725
// If there's something in the buffer waiting, then process it.
704726
function clearBuffer(stream, state) {
705-
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer ||
706-
(state.state & kConstructed) === 0) {
727+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) {
707728
return;
708729
}
709730

710731
const objectMode = (state.state & kObjectMode) !== 0;
711-
const { buffered, bufferedIndex } = state;
732+
const { [kBufferedValue]: buffered, bufferedIndex } = state;
712733
const bufferedLength = buffered.length - bufferedIndex;
713734

714735
if (!bufferedLength) {
@@ -838,10 +859,9 @@ function needFinish(state) {
838859
kWriting |
839860
kErrorEmitted |
840861
kCloseEmitted |
841-
kErrored
842-
)) === (kEnding | kConstructed) &&
843-
state.length === 0 &&
844-
state.buffered.length === 0);
862+
kErrored |
863+
kBuffered
864+
)) === (kEnding | kConstructed) && state.length === 0);
845865
}
846866

847867
function callFinal(stream, state) {
@@ -1083,9 +1103,7 @@ Writable.prototype.destroy = function(err, cb) {
10831103
const state = this._writableState;
10841104

10851105
// Invoke pending callbacks.
1086-
if ((state.state & kDestroyed) === 0 &&
1087-
(state.bufferedIndex < state.buffered.length ||
1088-
(state.state & kOnFinished) !== 0)) {
1106+
if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) {
10891107
process.nextTick(errorBuffer, state);
10901108
}
10911109

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.