Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4a77a11

Browse filesBrowse files
daeyeonRafaelGSS
authored andcommitted
stream: add ReadableByteStream.tee()
This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: #44505 Refs: https://streams.spec.whatwg.org/#readable-stream-tee Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 8752854 commit 4a77a11
Copy full SHA for 4a77a11

File tree

Expand file treeCollapse file tree

6 files changed

+343
-53
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+343
-53
lines changed
Open diff view settings
Collapse file

‎doc/api/webstreams.md‎

Copy file name to clipboardExpand all lines: doc/api/webstreams.md
+4Lines changed: 4 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ is active.
299299

300300
<!-- YAML
301301
added: v16.5.0
302+
changes:
303+
- version: REPLACEME
304+
pr-url: https://github.com/nodejs/node/pull/44505
305+
description: Support teeing a readable byte stream.
302306
-->
303307

304308
* Returns: {ReadableStream\[]}
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+293-11Lines changed: 293 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ const {
9595
ArrayBufferViewGetByteOffset,
9696
ArrayBufferGetByteLength,
9797
AsyncIterator,
98+
cloneAsUint8Array,
9899
copyArrayBuffer,
99100
customInspect,
100101
dequeueValue,
@@ -215,6 +216,7 @@ class ReadableStream {
215216
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
216217
this[kState] = {
217218
disturbed: false,
219+
reader: undefined,
218220
state: 'readable',
219221
storedError: undefined,
220222
stream: undefined,
@@ -1111,7 +1113,6 @@ class ReadableByteStreamController {
11111113
chunk);
11121114
}
11131115
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
1114-
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
11151116
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
11161117
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
11171118
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@@ -1122,11 +1123,7 @@ class ReadableByteStreamController {
11221123
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
11231124
if (this[kState].stream[kState].state !== 'readable')
11241125
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
1125-
readableByteStreamControllerEnqueue(
1126-
this,
1127-
chunkBuffer,
1128-
chunkByteLength,
1129-
chunkByteOffset);
1126+
readableByteStreamControllerEnqueue(this, chunk);
11301127
}
11311128

11321129
/**
@@ -1430,6 +1427,13 @@ function readableStreamPipeTo(
14301427
}
14311428

14321429
function readableStreamTee(stream, cloneForBranch2) {
1430+
if (isReadableByteStreamController(stream[kState].controller)) {
1431+
return readableByteStreamTee(stream);
1432+
}
1433+
return readableStreamDefaultTee(stream, cloneForBranch2);
1434+
}
1435+
1436+
function readableStreamDefaultTee(stream, cloneForBranch2) {
14331437
const reader = new ReadableStreamDefaultReader(stream);
14341438
let reading = false;
14351439
let canceled1 = false;
@@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15241528
return [branch1, branch2];
15251529
}
15261530

1531+
function readableByteStreamTee(stream) {
1532+
assert(isReadableStream(stream));
1533+
assert(isReadableByteStreamController(stream[kState].controller));
1534+
1535+
let reader = new ReadableStreamDefaultReader(stream);
1536+
let reading = false;
1537+
let readAgainForBranch1 = false;
1538+
let readAgainForBranch2 = false;
1539+
let canceled1 = false;
1540+
let canceled2 = false;
1541+
let reason1;
1542+
let reason2;
1543+
let branch1;
1544+
let branch2;
1545+
const cancelDeferred = createDeferredPromise();
1546+
1547+
function forwardReaderError(thisReader) {
1548+
PromisePrototypeThen(
1549+
thisReader[kState].close.promise,
1550+
undefined,
1551+
(error) => {
1552+
if (thisReader !== reader) {
1553+
return;
1554+
}
1555+
readableStreamDefaultControllerError(branch1[kState].controller, error);
1556+
readableStreamDefaultControllerError(branch2[kState].controller, error);
1557+
if (!canceled1 || !canceled2) {
1558+
cancelDeferred.resolve();
1559+
}
1560+
}
1561+
);
1562+
}
1563+
1564+
function pullWithDefaultReader() {
1565+
if (isReadableStreamBYOBReader(reader)) {
1566+
readableStreamBYOBReaderRelease(reader);
1567+
reader = new ReadableStreamDefaultReader(stream);
1568+
forwardReaderError(reader);
1569+
}
1570+
1571+
const readRequest = {
1572+
[kChunk](chunk) {
1573+
queueMicrotask(() => {
1574+
readAgainForBranch1 = false;
1575+
readAgainForBranch2 = false;
1576+
const chunk1 = chunk;
1577+
let chunk2 = chunk;
1578+
1579+
if (!canceled1 && !canceled2) {
1580+
try {
1581+
chunk2 = cloneAsUint8Array(chunk);
1582+
} catch (error) {
1583+
readableByteStreamControllerError(
1584+
branch1[kState].controller,
1585+
error
1586+
);
1587+
readableByteStreamControllerError(
1588+
branch2[kState].controller,
1589+
error
1590+
);
1591+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1592+
return;
1593+
}
1594+
}
1595+
if (!canceled1) {
1596+
readableByteStreamControllerEnqueue(
1597+
branch1[kState].controller,
1598+
chunk1
1599+
);
1600+
}
1601+
if (!canceled2) {
1602+
readableByteStreamControllerEnqueue(
1603+
branch2[kState].controller,
1604+
chunk2
1605+
);
1606+
}
1607+
reading = false;
1608+
1609+
if (readAgainForBranch1) {
1610+
pull1Algorithm();
1611+
} else if (readAgainForBranch2) {
1612+
pull2Algorithm();
1613+
}
1614+
});
1615+
},
1616+
[kClose]() {
1617+
reading = false;
1618+
1619+
if (!canceled1) {
1620+
readableByteStreamControllerClose(branch1[kState].controller);
1621+
}
1622+
if (!canceled2) {
1623+
readableByteStreamControllerClose(branch2[kState].controller);
1624+
}
1625+
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
1626+
readableByteStreamControllerRespond(branch1[kState].controller, 0);
1627+
}
1628+
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
1629+
readableByteStreamControllerRespond(branch2[kState].controller, 0);
1630+
}
1631+
if (!canceled1 || !canceled2) {
1632+
cancelDeferred.resolve();
1633+
}
1634+
},
1635+
[kError]() {
1636+
reading = false;
1637+
},
1638+
};
1639+
1640+
readableStreamDefaultReaderRead(reader, readRequest);
1641+
}
1642+
1643+
function pullWithBYOBReader(view, forBranch2) {
1644+
if (isReadableStreamDefaultReader(reader)) {
1645+
readableStreamDefaultReaderRelease(reader);
1646+
reader = new ReadableStreamBYOBReader(stream);
1647+
forwardReaderError(reader);
1648+
}
1649+
1650+
const byobBranch = forBranch2 === true ? branch2 : branch1;
1651+
const otherBranch = forBranch2 === false ? branch2 : branch1;
1652+
const readIntoRequest = {
1653+
[kChunk](chunk) {
1654+
queueMicrotask(() => {
1655+
readAgainForBranch1 = false;
1656+
readAgainForBranch2 = false;
1657+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1658+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1659+
1660+
if (!otherCanceled) {
1661+
let clonedChunk;
1662+
1663+
try {
1664+
clonedChunk = cloneAsUint8Array(chunk);
1665+
} catch (error) {
1666+
readableByteStreamControllerError(
1667+
byobBranch[kState].controller,
1668+
error
1669+
);
1670+
readableByteStreamControllerError(
1671+
otherBranch[kState].controller,
1672+
error
1673+
);
1674+
cancelDeferred.resolve(readableStreamCancel(stream, error));
1675+
return;
1676+
}
1677+
if (!byobCanceled) {
1678+
readableByteStreamControllerRespondWithNewView(
1679+
byobBranch[kState].controller,
1680+
chunk
1681+
);
1682+
}
1683+
1684+
readableByteStreamControllerEnqueue(
1685+
otherBranch[kState].controller,
1686+
clonedChunk
1687+
);
1688+
} else if (!byobCanceled) {
1689+
readableByteStreamControllerRespondWithNewView(
1690+
byobBranch[kState].controller,
1691+
chunk
1692+
);
1693+
}
1694+
reading = false;
1695+
1696+
if (readAgainForBranch1) {
1697+
pull1Algorithm();
1698+
} else if (readAgainForBranch2) {
1699+
pull2Algorithm();
1700+
}
1701+
});
1702+
},
1703+
[kClose](chunk) {
1704+
reading = false;
1705+
1706+
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
1707+
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
1708+
1709+
if (!byobCanceled) {
1710+
readableByteStreamControllerClose(byobBranch[kState].controller);
1711+
}
1712+
if (!otherCanceled) {
1713+
readableByteStreamControllerClose(otherBranch[kState].controller);
1714+
}
1715+
if (chunk !== undefined) {
1716+
if (!byobCanceled) {
1717+
readableByteStreamControllerRespondWithNewView(
1718+
byobBranch[kState].controller,
1719+
chunk
1720+
);
1721+
}
1722+
if (
1723+
!otherCanceled &&
1724+
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
1725+
) {
1726+
readableByteStreamControllerRespond(
1727+
otherBranch[kState].controller,
1728+
0
1729+
);
1730+
}
1731+
}
1732+
if (!byobCanceled || !otherCanceled) {
1733+
cancelDeferred.resolve();
1734+
}
1735+
},
1736+
[kError]() {
1737+
reading = false;
1738+
},
1739+
};
1740+
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1741+
}
1742+
1743+
function pull1Algorithm() {
1744+
if (reading) {
1745+
readAgainForBranch1 = true;
1746+
return PromiseResolve();
1747+
}
1748+
reading = true;
1749+
1750+
const byobRequest = branch1[kState].controller.byobRequest;
1751+
if (byobRequest === null) {
1752+
pullWithDefaultReader();
1753+
} else {
1754+
pullWithBYOBReader(byobRequest[kState].view, false);
1755+
}
1756+
return PromiseResolve();
1757+
}
1758+
1759+
function pull2Algorithm() {
1760+
if (reading) {
1761+
readAgainForBranch2 = true;
1762+
return PromiseResolve();
1763+
}
1764+
reading = true;
1765+
1766+
const byobRequest = branch2[kState].controller.byobRequest;
1767+
if (byobRequest === null) {
1768+
pullWithDefaultReader();
1769+
} else {
1770+
pullWithBYOBReader(byobRequest[kState].view, true);
1771+
}
1772+
return PromiseResolve();
1773+
}
1774+
1775+
function cancel1Algorithm(reason) {
1776+
canceled1 = true;
1777+
reason1 = reason;
1778+
if (canceled2) {
1779+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1780+
}
1781+
return cancelDeferred.promise;
1782+
}
1783+
1784+
function cancel2Algorithm(reason) {
1785+
canceled2 = true;
1786+
reason2 = reason;
1787+
if (canceled1) {
1788+
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
1789+
}
1790+
return cancelDeferred.promise;
1791+
}
1792+
1793+
branch1 = new ReadableStream({
1794+
type: 'bytes',
1795+
pull: pull1Algorithm,
1796+
cancel: cancel1Algorithm,
1797+
});
1798+
branch2 = new ReadableStream({
1799+
type: 'bytes',
1800+
pull: pull2Algorithm,
1801+
cancel: cancel2Algorithm,
1802+
});
1803+
1804+
forwardReaderError(reader);
1805+
1806+
return [branch1, branch2];
1807+
}
1808+
15271809
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
15281810
const {
15291811
buffer,
@@ -2317,18 +2599,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
23172599
desc.bytesFilled += size;
23182600
}
23192601

2320-
function readableByteStreamControllerEnqueue(
2321-
controller,
2322-
buffer,
2323-
byteLength,
2324-
byteOffset) {
2602+
function readableByteStreamControllerEnqueue(controller, chunk) {
23252603
const {
23262604
closeRequested,
23272605
pendingPullIntos,
23282606
queue,
23292607
stream,
23302608
} = controller[kState];
23312609

2610+
const buffer = ArrayBufferViewGetBuffer(chunk);
2611+
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
2612+
const byteLength = ArrayBufferViewGetByteLength(chunk);
2613+
23322614
if (closeRequested || stream[kState].state !== 'readable')
23332615
return;
23342616

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.