Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4cf4b41

Browse filesBrowse files
debadree25MylesBorins
authored andcommitted
stream: add suport for abort signal in finished() for webstreams
Refs: #46205 PR-URL: #46403 Refs: #37354 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 31d3e3c commit 4cf4b41
Copy full SHA for 4cf4b41

File tree

Expand file treeCollapse file tree

2 files changed

+116
-3
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+116
-3
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/end-of-stream.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/end-of-stream.js
+26-3Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,34 @@ function eos(stream, options, callback) {
261261
return cleanup;
262262
}
263263

264-
function eosWeb(stream, opts, callback) {
264+
function eosWeb(stream, options, callback) {
265+
let isAborted = false;
266+
let abort = nop;
267+
if (options.signal) {
268+
abort = () => {
269+
isAborted = true;
270+
callback.call(stream, new AbortError(undefined, { cause: options.signal.reason }));
271+
};
272+
if (options.signal.aborted) {
273+
process.nextTick(abort);
274+
} else {
275+
const originalCallback = callback;
276+
callback = once((...args) => {
277+
options.signal.removeEventListener('abort', abort);
278+
originalCallback.apply(stream, args);
279+
});
280+
options.signal.addEventListener('abort', abort);
281+
}
282+
}
283+
const resolverFn = (...args) => {
284+
if (!isAborted) {
285+
process.nextTick(() => callback.apply(stream, args));
286+
}
287+
};
265288
PromisePrototypeThen(
266289
stream[kIsClosedPromise].promise,
267-
() => process.nextTick(() => callback.call(stream)),
268-
(err) => process.nextTick(() => callback.call(stream, err)),
290+
resolverFn,
291+
resolverFn
269292
);
270293
return nop;
271294
}
Collapse file

‎test/parallel/test-webstreams-finished.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-webstreams-finished.js
+90Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,93 @@ const { finished: finishedPromise } = require('stream/promises');
230230
assert.strictEqual(err?.message, 'asd');
231231
});
232232
}
233+
234+
{
235+
// Check pre-cancelled
236+
const signal = new EventTarget();
237+
signal.aborted = true;
238+
239+
const rs = new ReadableStream({
240+
start() {}
241+
});
242+
finished(rs, { signal }, common.mustCall((err) => {
243+
assert.strictEqual(err.name, 'AbortError');
244+
}));
245+
}
246+
247+
{
248+
// Check cancelled before the stream ends sync.
249+
const ac = new AbortController();
250+
const { signal } = ac;
251+
252+
const rs = new ReadableStream({
253+
start() {}
254+
});
255+
finished(rs, { signal }, common.mustCall((err) => {
256+
assert.strictEqual(err.name, 'AbortError');
257+
}));
258+
259+
ac.abort();
260+
}
261+
262+
{
263+
// Check cancelled before the stream ends async.
264+
const ac = new AbortController();
265+
const { signal } = ac;
266+
267+
const rs = new ReadableStream({
268+
start() {}
269+
});
270+
setTimeout(() => ac.abort(), 1);
271+
finished(rs, { signal }, common.mustCall((err) => {
272+
assert.strictEqual(err.name, 'AbortError');
273+
}));
274+
}
275+
276+
{
277+
// Check cancelled after doesn't throw.
278+
const ac = new AbortController();
279+
const { signal } = ac;
280+
281+
const rs = new ReadableStream({
282+
start(controller) {
283+
controller.enqueue('asd');
284+
controller.close();
285+
}
286+
});
287+
finished(rs, { signal }, common.mustSucceed());
288+
289+
rs.getReader().read().then(common.mustCall((chunk) => {
290+
assert.strictEqual(chunk.value, 'asd');
291+
setImmediate(() => ac.abort());
292+
}));
293+
}
294+
295+
{
296+
// Promisified abort works
297+
async function run() {
298+
const ac = new AbortController();
299+
const { signal } = ac;
300+
const rs = new ReadableStream({
301+
start() {}
302+
});
303+
setImmediate(() => ac.abort());
304+
await finishedPromise(rs, { signal });
305+
}
306+
307+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
308+
}
309+
310+
{
311+
// Promisified pre-aborted works
312+
async function run() {
313+
const signal = new EventTarget();
314+
signal.aborted = true;
315+
const rs = new ReadableStream({
316+
start() {}
317+
});
318+
await finishedPromise(rs, { signal });
319+
}
320+
321+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
322+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.