Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 438b7fd

Browse filesBrowse files
tsctxtargos
authored andcommitted
stream: fix cloned webstreams not being unref correctly
PR-URL: #51526 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
1 parent 4b1d25b commit 438b7fd
Copy full SHA for 438b7fd

File tree

Expand file treeCollapse file tree

5 files changed

+63
-14
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+63
-14
lines changed
Open diff view settings
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+5-1Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,11 @@ class ReadableStream {
607607
const transfer = lazyTransfer();
608608
setupReadableStreamDefaultControllerFromSource(
609609
this,
610-
new transfer.CrossRealmTransformReadableSource(port),
610+
// The MessagePort is set to be referenced when reading.
611+
// After two MessagePorts are closed, there is a problem with
612+
// lingering promise not being properly resolved.
613+
// https://github.com/nodejs/node/issues/51486
614+
new transfer.CrossRealmTransformReadableSource(port, true),
611615
0, () => 1);
612616
}
613617
}
Collapse file

‎lib/internal/webstreams/transfer.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/transfer.js
+26-10Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ function InternalCloneableDOMException() {
102102
InternalCloneableDOMException[kDeserialize] = () => {};
103103

104104
class CrossRealmTransformReadableSource {
105-
constructor(port) {
105+
constructor(port, unref) {
106106
this[kState] = {
107107
port,
108108
controller: undefined,
109+
unref,
109110
};
110111

111112
port.onmessage = ({ data }) => {
@@ -143,13 +144,19 @@ class CrossRealmTransformReadableSource {
143144
error);
144145
port.close();
145146
};
147+
148+
port.unref();
146149
}
147150

148151
start(controller) {
149152
this[kState].controller = controller;
150153
}
151154

152155
async pull() {
156+
if (this[kState].unref) {
157+
this[kState].unref = false;
158+
this[kState].port.ref();
159+
}
153160
this[kState].port.postMessage({ type: 'pull' });
154161
}
155162

@@ -170,11 +177,12 @@ class CrossRealmTransformReadableSource {
170177
}
171178

172179
class CrossRealmTransformWritableSink {
173-
constructor(port) {
180+
constructor(port, unref) {
174181
this[kState] = {
175182
port,
176183
controller: undefined,
177184
backpressurePromise: createDeferredPromise(),
185+
unref,
178186
};
179187

180188
port.onmessage = ({ data }) => {
@@ -211,13 +219,18 @@ class CrossRealmTransformWritableSink {
211219
port.close();
212220
};
213221

222+
port.unref();
214223
}
215224

216225
start(controller) {
217226
this[kState].controller = controller;
218227
}
219228

220229
async write(chunk) {
230+
if (this[kState].unref) {
231+
this[kState].unref = false;
232+
this[kState].port.ref();
233+
}
221234
if (this[kState].backpressurePromise === undefined) {
222235
this[kState].backpressurePromise = {
223236
promise: PromiseResolve(),
@@ -262,12 +275,12 @@ class CrossRealmTransformWritableSink {
262275
}
263276

264277
function newCrossRealmReadableStream(writable, port) {
265-
const readable =
266-
new ReadableStream(
267-
new CrossRealmTransformReadableSource(port));
278+
// MessagePort should always be unref.
279+
// There is a problem with the process not terminating.
280+
// https://github.com/nodejs/node/issues/44985
281+
const readable = new ReadableStream(new CrossRealmTransformReadableSource(port, false));
268282

269-
const promise =
270-
readableStreamPipeTo(readable, writable, false, false, false);
283+
const promise = readableStreamPipeTo(readable, writable, false, false, false);
271284

272285
setPromiseHandled(promise);
273286

@@ -278,12 +291,15 @@ function newCrossRealmReadableStream(writable, port) {
278291
}
279292

280293
function newCrossRealmWritableSink(readable, port) {
281-
const writable =
282-
new WritableStream(
283-
new CrossRealmTransformWritableSink(port));
294+
// MessagePort should always be unref.
295+
// There is a problem with the process not terminating.
296+
// https://github.com/nodejs/node/issues/44985
297+
const writable = new WritableStream(new CrossRealmTransformWritableSink(port, false));
284298

285299
const promise = readableStreamPipeTo(readable, writable, false, false, false);
300+
286301
setPromiseHandled(promise);
302+
287303
return {
288304
writable,
289305
promise,
Collapse file

‎lib/internal/webstreams/writablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/writablestream.js
+5-3Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,6 @@ class WritableStream {
263263
this[kState].transfer.readable = readable;
264264
this[kState].transfer.promise = promise;
265265

266-
setPromiseHandled(this[kState].transfer.promise);
267-
268266
return {
269267
data: { port: this[kState].transfer.port2 },
270268
deserializeInfo:
@@ -283,7 +281,11 @@ class WritableStream {
283281
const transfer = lazyTransfer();
284282
setupWritableStreamDefaultControllerFromSink(
285283
this,
286-
new transfer.CrossRealmTransformWritableSink(port),
284+
// The MessagePort is set to be referenced when reading.
285+
// After two MessagePorts are closed, there is a problem with
286+
// lingering promise not being properly resolved.
287+
// https://github.com/nodejs/node/issues/51486
288+
new transfer.CrossRealmTransformWritableSink(port, true),
287289
1,
288290
() => 1);
289291
}
Collapse file
+16Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
'use strict';
2+
3+
require('../common');
4+
const { ok } = require('node:assert');
5+
6+
// This test verifies that cloned ReadableStream and WritableStream instances
7+
// do not keep the process alive. The test fails if it timesout (it should just
8+
// exit immediately)
9+
10+
const rs1 = new ReadableStream();
11+
const ws1 = new WritableStream();
12+
13+
const [rs2, ws2] = structuredClone([rs1, ws1], { transfer: [rs1, ws1] });
14+
15+
ok(rs2 instanceof ReadableStream);
16+
ok(ws2 instanceof WritableStream);
Collapse file

‎test/parallel/test-whatwg-webstreams-transfer.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-whatwg-webstreams-transfer.js
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,23 @@ const theData = 'hello';
464464
tracker.verify();
465465
});
466466
467+
// We create an interval to keep the event loop alive while
468+
// we wait for the stream read to complete. The reason this is needed is because there's
469+
// otherwise nothing to keep the worker thread event loop alive long enough to actually
470+
// complete the read from the stream. Under the covers the ReadableStream uses an
471+
// unref'd MessagePort to communicate with the main thread. Because the MessagePort
472+
// is unref'd, it's existence would not keep the thread alive on its own. There was previously
473+
// a bug where this MessagePort was ref'd which would block the thread and main thread
474+
// from terminating at all unless the stream was consumed/closed.
475+
const i = setInterval(() => {}, 1000);
476+
467477
parentPort.onmessage = tracker.calls(({ data }) => {
468478
assert(isReadableStream(data));
469479
const reader = data.getReader();
470480
reader.read().then(tracker.calls((result) => {
471481
assert(!result.done);
472482
assert(result.value instanceof Uint8Array);
483+
clearInterval(i);
473484
}));
474485
parentPort.close();
475486
});

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.