Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3dd82b1

Browse filesBrowse files
atlowChemiruyadorno
authored andcommitted
stream: use addAbortListener
PR-URL: #48550 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent cb51ef2 commit 3dd82b1
Copy full SHA for 3dd82b1

File tree

Expand file treeCollapse file tree

5 files changed

+39
-26
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+39
-26
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/add-abort-signal.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/add-abort-signal.js
+8-2Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
'use strict';
22

3+
const {
4+
SymbolDispose,
5+
} = primordials;
6+
37
const {
48
AbortError,
59
codes,
@@ -13,6 +17,7 @@ const {
1317

1418
const eos = require('internal/streams/end-of-stream');
1519
const { ERR_INVALID_ARG_TYPE } = codes;
20+
let addAbortListener;
1621

1722
// This method is inlined here for readable-stream
1823
// It also does not allow for signal to not exist on the stream
@@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
4651
if (signal.aborted) {
4752
onAbort();
4853
} else {
49-
signal.addEventListener('abort', onAbort);
50-
eos(stream, () => signal.removeEventListener('abort', onAbort));
54+
addAbortListener ??= require('events').addAbortListener;
55+
const disposable = addAbortListener(signal, onAbort);
56+
eos(stream, disposable[SymbolDispose]);
5157
}
5258
return stream;
5359
};
Collapse file

‎lib/internal/streams/end-of-stream.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/end-of-stream.js
+12-5Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ const {
2222
validateBoolean,
2323
} = require('internal/validators');
2424

25-
const { Promise, PromisePrototypeThen } = primordials;
25+
const {
26+
Promise,
27+
PromisePrototypeThen,
28+
SymbolDispose,
29+
} = primordials;
2630

2731
const {
2832
isClosed,
@@ -40,6 +44,7 @@ const {
4044
willEmitClose: _willEmitClose,
4145
kIsClosedPromise,
4246
} = require('internal/streams/utils');
47+
let addAbortListener;
4348

4449
function isRequest(stream) {
4550
return stream.setHeader && typeof stream.abort === 'function';
@@ -249,12 +254,13 @@ function eos(stream, options, callback) {
249254
if (options.signal.aborted) {
250255
process.nextTick(abort);
251256
} else {
257+
addAbortListener ??= require('events').addAbortListener;
258+
const disposable = addAbortListener(options.signal, abort);
252259
const originalCallback = callback;
253260
callback = once((...args) => {
254-
options.signal.removeEventListener('abort', abort);
261+
disposable[SymbolDispose]();
255262
originalCallback.apply(stream, args);
256263
});
257-
options.signal.addEventListener('abort', abort);
258264
}
259265
}
260266

@@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) {
272278
if (options.signal.aborted) {
273279
process.nextTick(abort);
274280
} else {
281+
addAbortListener ??= require('events').addAbortListener;
282+
const disposable = addAbortListener(options.signal, abort);
275283
const originalCallback = callback;
276284
callback = once((...args) => {
277-
options.signal.removeEventListener('abort', abort);
285+
disposable[SymbolDispose]();
278286
originalCallback.apply(stream, args);
279287
});
280-
options.signal.addEventListener('abort', abort);
281288
}
282289
}
283290
const resolverFn = (...args) => {
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+5-15Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const { AbortController } = require('internal/abort_controller');
3+
const { AbortController, AbortSignal } = require('internal/abort_controller');
44

55
const {
66
codes: {
@@ -16,7 +16,7 @@ const {
1616
validateInteger,
1717
validateObject,
1818
} = require('internal/validators');
19-
const { kWeakHandler } = require('internal/event_target');
19+
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
2020
const { finished } = require('internal/streams/end-of-stream');
2121
const staticCompose = require('internal/streams/compose');
2222
const {
@@ -27,6 +27,7 @@ const { deprecate } = require('internal/util');
2727

2828
const {
2929
ArrayPrototypePush,
30+
Boolean,
3031
MathFloor,
3132
Number,
3233
NumberIsNaN,
@@ -84,19 +85,11 @@ function map(fn, options) {
8485
validateInteger(concurrency, 'concurrency', 1);
8586

8687
return async function* map() {
87-
const ac = new AbortController();
88+
const signal = AbortSignal.any([options?.signal].filter(Boolean));
8889
const stream = this;
8990
const queue = [];
90-
const signal = ac.signal;
9191
const signalOpt = { signal };
9292

93-
const abort = () => ac.abort();
94-
if (options?.signal?.aborted) {
95-
abort();
96-
}
97-
98-
options?.signal?.addEventListener('abort', abort);
99-
10093
let next;
10194
let resume;
10295
let done = false;
@@ -153,7 +146,6 @@ function map(fn, options) {
153146
next();
154147
next = null;
155148
}
156-
options?.signal?.removeEventListener('abort', abort);
157149
}
158150
}
159151

@@ -188,8 +180,6 @@ function map(fn, options) {
188180
});
189181
}
190182
} finally {
191-
ac.abort();
192-
193183
done = true;
194184
if (resume) {
195185
resume();
@@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options) {
301291
const ac = new AbortController();
302292
const signal = ac.signal;
303293
if (options?.signal) {
304-
const opts = { once: true, [kWeakHandler]: this };
294+
const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true };
305295
options.signal.addEventListener('abort', () => ac.abort(), opts);
306296
}
307297
let gotAnyItemFromStream = false;
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+8-2Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
ArrayIsArray,
88
Promise,
99
SymbolAsyncIterator,
10+
SymbolDispose,
1011
} = primordials;
1112

1213
const eos = require('internal/streams/end-of-stream');
@@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller');
4445

4546
let PassThrough;
4647
let Readable;
48+
let addAbortListener;
4749

4850
function destroyer(stream, reading, writing) {
4951
let finished = false;
@@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) {
206208
finishImpl(new AbortError());
207209
}
208210

209-
outerSignal?.addEventListener('abort', abort);
211+
addAbortListener ??= require('events').addAbortListener;
212+
let disposable;
213+
if (outerSignal) {
214+
disposable = addAbortListener(outerSignal, abort);
215+
}
210216

211217
let error;
212218
let value;
@@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) {
231237
destroys.shift()(error);
232238
}
233239

234-
outerSignal?.removeEventListener('abort', abort);
240+
disposable?.[SymbolDispose]();
235241
ac.abort();
236242

237243
if (final) {
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SafePromiseAll,
2323
Symbol,
2424
SymbolAsyncIterator,
25+
SymbolDispose,
2526
SymbolToStringTag,
2627
Uint8Array,
2728
} = primordials;
@@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease');
140141

141142
let releasedError;
142143
let releasingError;
144+
let addAbortListener;
143145

144146
const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;
145147

@@ -1259,6 +1261,7 @@ function readableStreamPipeTo(
12591261

12601262
let reader;
12611263
let writer;
1264+
let disposable;
12621265
// Both of these can throw synchronously. We want to capture
12631266
// the error and return a rejected promise instead.
12641267
try {
@@ -1291,7 +1294,7 @@ function readableStreamPipeTo(
12911294
writableStreamDefaultWriterRelease(writer);
12921295
readableStreamReaderGenericRelease(reader);
12931296
if (signal !== undefined)
1294-
signal.removeEventListener('abort', abortAlgorithm);
1297+
disposable?.[SymbolDispose]();
12951298
if (rejected)
12961299
promise.reject(error);
12971300
else
@@ -1418,7 +1421,8 @@ function readableStreamPipeTo(
14181421
abortAlgorithm();
14191422
return promise.promise;
14201423
}
1421-
signal.addEventListener('abort', abortAlgorithm, { once: true });
1424+
addAbortListener ??= require('events').addAbortListener;
1425+
disposable = addAbortListener(signal, abortAlgorithm);
14221426
}
14231427

14241428
setPromiseHandled(run());

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.