Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 71acd36

Browse filesBrowse files
debadree25RafaelGSS
authored andcommitted
stream: implement TransformStream cleanup using "transformer.cancel"
Fixes: #49971 PR-URL: #50126 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 3ccd5fa commit 71acd36
Copy full SHA for 71acd36

File tree

Expand file treeCollapse file tree

72 files changed

+418
-94
lines changed
Open diff view settings
Filter options

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner
Expand file treeCollapse file tree

72 files changed

+418
-94
lines changed
Open diff view settings
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+3-2Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const {
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17+
Promise,
1718
PromisePrototypeThen,
1819
PromiseResolve,
1920
PromiseReject,
@@ -2444,7 +2445,7 @@ function setupReadableStreamDefaultController(
24442445
const startResult = startAlgorithm();
24452446

24462447
PromisePrototypeThen(
2447-
PromiseResolve(startResult),
2448+
new Promise((r) => r(startResult)),
24482449
() => {
24492450
controller[kState].started = true;
24502451
assert(!controller[kState].pulling);
@@ -3243,7 +3244,7 @@ function setupReadableByteStreamController(
32433244
const startResult = startAlgorithm();
32443245

32453246
PromisePrototypeThen(
3246-
PromiseResolve(startResult),
3247+
new Promise((r) => r(startResult)),
32473248
() => {
32483249
controller[kState].started = true;
32493250
assert(!controller[kState].pulling);
Collapse file

‎lib/internal/webstreams/transformstream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/transformstream.js
+105-11Lines changed: 105 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const {
66
ObjectDefineProperties,
77
ObjectSetPrototypeOf,
88
PromisePrototypeThen,
9-
PromiseResolve,
109
SymbolToStringTag,
1110
Symbol,
1211
} = primordials;
@@ -47,6 +46,7 @@ const {
4746
nonOpFlush,
4847
kType,
4948
kState,
49+
nonOpCancel,
5050
} = require('internal/webstreams/util');
5151

5252
const {
@@ -384,8 +384,7 @@ function initializeTransformStream(
384384
return transformStreamDefaultSourcePullAlgorithm(stream);
385385
},
386386
cancel(reason) {
387-
transformStreamErrorWritableAndUnblockWrite(stream, reason);
388-
return PromiseResolve();
387+
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
389388
},
390389
}, {
391390
highWaterMark: readableHighWaterMark,
@@ -427,6 +426,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
427426
writableStreamDefaultControllerErrorIfNeeded(
428427
writable[kState].controller,
429428
error);
429+
transformStreamUnblockWrite(stream);
430+
}
431+
432+
function transformStreamUnblockWrite(stream) {
430433
if (stream[kState].backpressure)
431434
transformStreamSetBackpressure(stream, false);
432435
}
@@ -443,13 +446,15 @@ function setupTransformStreamDefaultController(
443446
stream,
444447
controller,
445448
transformAlgorithm,
446-
flushAlgorithm) {
449+
flushAlgorithm,
450+
cancelAlgorithm) {
447451
assert(isTransformStream(stream));
448452
assert(stream[kState].controller === undefined);
449453
controller[kState] = {
450454
stream,
451455
transformAlgorithm,
452456
flushAlgorithm,
457+
cancelAlgorithm,
453458
};
454459
stream[kState].controller = controller;
455460
}
@@ -460,21 +465,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
460465
const controller = new TransformStreamDefaultController(kSkipThrow);
461466
const transform = transformer?.transform || defaultTransformAlgorithm;
462467
const flush = transformer?.flush || nonOpFlush;
468+
const cancel = transformer?.cancel || nonOpCancel;
463469
const transformAlgorithm =
464470
FunctionPrototypeBind(transform, transformer);
465471
const flushAlgorithm =
466472
FunctionPrototypeBind(flush, transformer);
473+
const cancelAlgorithm =
474+
FunctionPrototypeBind(cancel, transformer);
467475

468476
setupTransformStreamDefaultController(
469477
stream,
470478
controller,
471479
transformAlgorithm,
472-
flushAlgorithm);
480+
flushAlgorithm,
481+
cancelAlgorithm);
473482
}
474483

475484
function transformStreamDefaultControllerClearAlgorithms(controller) {
476485
controller[kState].transformAlgorithm = undefined;
477486
controller[kState].flushAlgorithm = undefined;
487+
controller[kState].cancelAlgorithm = undefined;
478488
}
479489

480490
function transformStreamDefaultControllerEnqueue(controller, chunk) {
@@ -563,7 +573,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
563573
}
564574

565575
async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
566-
transformStreamError(stream, reason);
576+
const {
577+
controller,
578+
readable,
579+
} = stream[kState];
580+
581+
if (controller[kState].finishPromise !== undefined) {
582+
return controller[kState].finishPromise;
583+
}
584+
585+
const { promise, resolve, reject } = createDeferredPromise();
586+
controller[kState].finishPromise = promise;
587+
const cancelPromise = ensureIsPromise(
588+
controller[kState].cancelAlgorithm,
589+
controller,
590+
reason);
591+
transformStreamDefaultControllerClearAlgorithms(controller);
592+
593+
PromisePrototypeThen(
594+
cancelPromise,
595+
() => {
596+
if (readable[kState].state === 'errored')
597+
reject(readable[kState].storedError);
598+
else {
599+
readableStreamDefaultControllerError(readable[kState].controller, reason);
600+
resolve();
601+
}
602+
},
603+
(error) => {
604+
readableStreamDefaultControllerError(readable[kState].controller, error);
605+
reject(error);
606+
},
607+
);
608+
609+
return controller[kState].finishPromise;
567610
}
568611

569612
function transformStreamDefaultSinkCloseAlgorithm(stream) {
@@ -572,23 +615,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
572615
controller,
573616
} = stream[kState];
574617

618+
if (controller[kState].finishPromise !== undefined) {
619+
return controller[kState].finishPromise;
620+
}
621+
const { promise, resolve, reject } = createDeferredPromise();
622+
controller[kState].finishPromise = promise;
575623
const flushPromise =
576624
ensureIsPromise(
577625
controller[kState].flushAlgorithm,
578626
controller,
579627
controller);
580628
transformStreamDefaultControllerClearAlgorithms(controller);
581-
return PromisePrototypeThen(
629+
PromisePrototypeThen(
582630
flushPromise,
583631
() => {
584632
if (readable[kState].state === 'errored')
585-
throw readable[kState].storedError;
586-
readableStreamDefaultControllerClose(readable[kState].controller);
633+
reject(readable[kState].storedError);
634+
else {
635+
readableStreamDefaultControllerClose(readable[kState].controller);
636+
resolve();
637+
}
587638
},
588639
(error) => {
589-
transformStreamError(stream, error);
590-
throw readable[kState].storedError;
640+
readableStreamDefaultControllerError(readable[kState].controller, error);
641+
reject(error);
591642
});
643+
return controller[kState].finishPromise;
592644
}
593645

594646
function transformStreamDefaultSourcePullAlgorithm(stream) {
@@ -598,6 +650,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
598650
return stream[kState].backpressureChange.promise;
599651
}
600652

653+
function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
654+
const {
655+
controller,
656+
writable,
657+
} = stream[kState];
658+
659+
if (controller[kState].finishPromise !== undefined) {
660+
return controller[kState].finishPromise;
661+
}
662+
663+
const { promise, resolve, reject } = createDeferredPromise();
664+
controller[kState].finishPromise = promise;
665+
const cancelPromise = ensureIsPromise(
666+
controller[kState].cancelAlgorithm,
667+
controller,
668+
reason);
669+
transformStreamDefaultControllerClearAlgorithms(controller);
670+
671+
PromisePrototypeThen(cancelPromise,
672+
() => {
673+
if (writable[kState].state === 'errored')
674+
reject(writable[kState].storedError);
675+
else {
676+
writableStreamDefaultControllerErrorIfNeeded(
677+
writable[kState].controller,
678+
reason);
679+
transformStreamUnblockWrite(stream);
680+
resolve();
681+
}
682+
},
683+
(error) => {
684+
writableStreamDefaultControllerErrorIfNeeded(
685+
writable[kState].controller,
686+
error);
687+
transformStreamUnblockWrite(stream);
688+
reject(error);
689+
},
690+
);
691+
692+
return controller[kState].finishPromise;
693+
}
694+
601695
module.exports = {
602696
TransformStream,
603697
TransformStreamDefaultController,
Collapse file

‎lib/internal/webstreams/writablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/writablestream.js
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
FunctionPrototypeCall,
88
ObjectDefineProperties,
99
ObjectSetPrototypeOf,
10+
Promise,
1011
PromisePrototypeThen,
1112
PromiseResolve,
1213
PromiseReject,
@@ -1295,7 +1296,7 @@ function setupWritableStreamDefaultController(
12951296
const startResult = startAlgorithm();
12961297

12971298
PromisePrototypeThen(
1298-
PromiseResolve(startResult),
1299+
new Promise((r) => r(startResult)),
12991300
() => {
13001301
assert(stream[kState].state === 'writable' ||
13011302
stream[kState].state === 'erroring');
Collapse file

‎test/fixtures/wpt/README.md‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/README.md
+1-1Lines changed: 1 addition & 1 deletion
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Last update:
2727
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
2828
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
2929
- resources: https://github.com/web-platform-tests/wpt/tree/1e140d63ec/resources
30-
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
30+
- streams: https://github.com/web-platform-tests/wpt/tree/a8872d92b1/streams
3131
- url: https://github.com/web-platform-tests/wpt/tree/c2d7e70b52/url
3232
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
3333
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Collapse file

‎test/fixtures/wpt/streams/piping/abort.any.js‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/streams/piping/abort.any.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
// META: script=../resources/test-utils.js
44
'use strict';
Collapse file

‎test/fixtures/wpt/streams/piping/close-propagation-backward.any.js‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/streams/piping/close-propagation-backward.any.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
'use strict';
44

Collapse file

‎test/fixtures/wpt/streams/piping/close-propagation-forward.any.js‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/streams/piping/close-propagation-forward.any.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';
Collapse file
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<!DOCTYPE html>
2+
<script type="module">
3+
let a = new ReadableStream();
4+
let b = self.open()
5+
let f = new b.WritableStream();
6+
a.pipeThrough(
7+
{ "readable": a, "writable": f },
8+
{ "signal": AbortSignal.abort() }
9+
)
10+
await new Promise(setTimeout);
11+
structuredClone(undefined, { "transfer": [f] })
12+
</script>
Collapse file

‎test/fixtures/wpt/streams/piping/error-propagation-backward.any.js‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/streams/piping/error-propagation-backward.any.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';
Collapse file

‎test/fixtures/wpt/streams/piping/error-propagation-forward.any.js‎

Copy file name to clipboardExpand all lines: test/fixtures/wpt/streams/piping/error-propagation-forward.any.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.