Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a85e418

Browse filesBrowse files
authored
stream: reduce overhead of transfer
PR-URL: #50107 Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent 760b5dd commit a85e418
Copy full SHA for a85e418

File tree

Expand file treeCollapse file tree

4 files changed

+163
-85
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+163
-85
lines changed
Open diff view settings
Collapse file
+52Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
5+
const { MessageChannel } = require('worker_threads');
6+
const { WritableStream, TransformStream, ReadableStream } = require('stream/web');
7+
8+
const bench = common.createBenchmark(main, {
9+
payload: ['WritableStream', 'ReadableStream', 'TransformStream'],
10+
n: [1e4],
11+
});
12+
13+
function main({ n, payload: payloadType }) {
14+
let createPayload;
15+
let messages = 0;
16+
17+
switch (payloadType) {
18+
case 'WritableStream':
19+
createPayload = () => new WritableStream();
20+
break;
21+
case 'ReadableStream':
22+
createPayload = () => new ReadableStream();
23+
break;
24+
case 'TransformStream':
25+
createPayload = () => new TransformStream();
26+
break;
27+
default:
28+
throw new Error('Unsupported payload type');
29+
}
30+
31+
const { port1, port2 } = new MessageChannel();
32+
33+
port2.onmessage = onMessage;
34+
35+
function onMessage() {
36+
if (messages++ === n) {
37+
bench.end(n);
38+
port1.close();
39+
} else {
40+
send();
41+
}
42+
}
43+
44+
function send() {
45+
const stream = createPayload();
46+
47+
port1.postMessage(stream, [stream]);
48+
}
49+
50+
bench.start();
51+
send();
52+
}
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+29-19Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ const {
1717
PromisePrototypeThen,
1818
PromiseResolve,
1919
PromiseReject,
20-
ReflectConstruct,
2120
SafePromiseAll,
2221
Symbol,
2322
SymbolAsyncIterator,
@@ -642,26 +641,37 @@ ObjectDefineProperties(ReadableStream.prototype, {
642641
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name),
643642
});
644643

645-
function TransferredReadableStream() {
646-
return ReflectConstruct(
647-
function() {
648-
markTransferMode(this, false, true);
649-
this[kType] = 'ReadableStream';
650-
this[kState] = {
651-
disturbed: false,
652-
state: 'readable',
653-
storedError: undefined,
654-
stream: undefined,
655-
transfer: {
656-
writable: undefined,
657-
port: undefined,
658-
promise: undefined,
659-
},
660-
};
661-
this[kIsClosedPromise] = createDeferredPromise();
644+
function InternalTransferredReadableStream() {
645+
markTransferMode(this, false, true);
646+
this[kType] = 'ReadableStream';
647+
this[kState] = {
648+
disturbed: false,
649+
reader: undefined,
650+
state: 'readable',
651+
storedError: undefined,
652+
stream: undefined,
653+
transfer: {
654+
writable: undefined,
655+
port1: undefined,
656+
port2: undefined,
657+
promise: undefined,
662658
},
663-
[], ReadableStream);
659+
};
660+
661+
this[kIsClosedPromise] = createDeferredPromise();
664662
}
663+
664+
ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype);
665+
ObjectSetPrototypeOf(InternalTransferredReadableStream, ReadableStream);
666+
667+
function TransferredReadableStream() {
668+
const stream = new InternalTransferredReadableStream();
669+
670+
stream.constructor = ReadableStream;
671+
672+
return stream;
673+
}
674+
665675
TransferredReadableStream.prototype[kDeserialize] = () => {};
666676

667677
class ReadableStreamBYOBRequest {
Collapse file

‎lib/internal/webstreams/transformstream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/transformstream.js
+26-18Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ const {
44
FunctionPrototypeBind,
55
FunctionPrototypeCall,
66
ObjectDefineProperties,
7+
ObjectSetPrototypeOf,
78
PromisePrototypeThen,
89
PromiseResolve,
9-
ReflectConstruct,
1010
SymbolToStringTag,
1111
Symbol,
1212
} = primordials;
@@ -247,25 +247,33 @@ ObjectDefineProperties(TransformStream.prototype, {
247247
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
248248
});
249249

250-
function TransferredTransformStream() {
251-
return ReflectConstruct(
252-
function() {
253-
markTransferMode(this, false, true);
254-
this[kType] = 'TransformStream';
255-
this[kState] = {
256-
readable: undefined,
257-
writable: undefined,
258-
backpressure: undefined,
259-
backpressureChange: {
260-
promise: undefined,
261-
resolve: undefined,
262-
reject: undefined,
263-
},
264-
controller: undefined,
265-
};
250+
function InternalTransferredTransformStream() {
251+
markTransferMode(this, false, true);
252+
this[kType] = 'TransformStream';
253+
this[kState] = {
254+
readable: undefined,
255+
writable: undefined,
256+
backpressure: undefined,
257+
backpressureChange: {
258+
promise: undefined,
259+
resolve: undefined,
260+
reject: undefined,
266261
},
267-
[], TransformStream);
262+
controller: undefined,
263+
};
268264
}
265+
266+
ObjectSetPrototypeOf(InternalTransferredTransformStream.prototype, TransformStream.prototype);
267+
ObjectSetPrototypeOf(InternalTransferredTransformStream, TransformStream);
268+
269+
function TransferredTransformStream() {
270+
const stream = new InternalTransferredTransformStream();
271+
272+
stream.constructor = TransformStream;
273+
274+
return stream;
275+
}
276+
269277
TransferredTransformStream.prototype[kDeserialize] = () => {};
270278

271279
class TransformStreamDefaultController {
Collapse file

‎lib/internal/webstreams/writablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/writablestream.js
+56-48Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ const {
66
FunctionPrototypeBind,
77
FunctionPrototypeCall,
88
ObjectDefineProperties,
9+
ObjectSetPrototypeOf,
910
PromisePrototypeThen,
1011
PromiseResolve,
1112
PromiseReject,
12-
ReflectConstruct,
1313
Symbol,
1414
SymbolToStringTag,
1515
} = primordials;
@@ -326,55 +326,63 @@ ObjectDefineProperties(WritableStream.prototype, {
326326
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name),
327327
});
328328

329-
function TransferredWritableStream() {
330-
return ReflectConstruct(
331-
function() {
332-
markTransferMode(this, false, true);
333-
this[kType] = 'WritableStream';
334-
this[kState] = {
335-
close: createDeferredPromise(),
336-
closeRequest: {
337-
promise: undefined,
338-
resolve: undefined,
339-
reject: undefined,
340-
},
341-
inFlightWriteRequest: {
342-
promise: undefined,
343-
resolve: undefined,
344-
reject: undefined,
345-
},
346-
inFlightCloseRequest: {
347-
promise: undefined,
348-
resolve: undefined,
349-
reject: undefined,
350-
},
351-
pendingAbortRequest: {
352-
abort: {
353-
promise: undefined,
354-
resolve: undefined,
355-
reject: undefined,
356-
},
357-
reason: undefined,
358-
wasAlreadyErroring: false,
359-
},
360-
backpressure: false,
361-
controller: undefined,
362-
state: 'writable',
363-
storedError: undefined,
364-
writeRequests: [],
365-
writer: undefined,
366-
transfer: {
367-
promise: undefined,
368-
port1: undefined,
369-
port2: undefined,
370-
readable: undefined,
371-
},
372-
};
373-
this[kIsClosedPromise] = createDeferredPromise();
374-
this[kControllerErrorFunction] = () => {};
329+
function InternalTransferredWritableStream() {
330+
markTransferMode(this, false, true);
331+
this[kType] = 'WritableStream';
332+
this[kState] = {
333+
close: createDeferredPromise(),
334+
closeRequest: {
335+
promise: undefined,
336+
resolve: undefined,
337+
reject: undefined,
338+
},
339+
inFlightWriteRequest: {
340+
promise: undefined,
341+
resolve: undefined,
342+
reject: undefined,
375343
},
376-
[], WritableStream);
344+
inFlightCloseRequest: {
345+
promise: undefined,
346+
resolve: undefined,
347+
reject: undefined,
348+
},
349+
pendingAbortRequest: {
350+
abort: {
351+
promise: undefined,
352+
resolve: undefined,
353+
reject: undefined,
354+
},
355+
reason: undefined,
356+
wasAlreadyErroring: false,
357+
},
358+
backpressure: false,
359+
controller: undefined,
360+
state: 'writable',
361+
storedError: undefined,
362+
writeRequests: [],
363+
writer: undefined,
364+
transfer: {
365+
readable: undefined,
366+
port1: undefined,
367+
port2: undefined,
368+
promise: undefined,
369+
},
370+
};
371+
372+
this[kIsClosedPromise] = createDeferredPromise();
377373
}
374+
375+
ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype);
376+
ObjectSetPrototypeOf(InternalTransferredWritableStream, WritableStream);
377+
378+
function TransferredWritableStream() {
379+
const stream = new InternalTransferredWritableStream();
380+
381+
stream.constructor = WritableStream;
382+
383+
return stream;
384+
}
385+
378386
TransferredWritableStream.prototype[kDeserialize] = () => {};
379387

380388
class WritableStreamDefaultWriter {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.