Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cde5960

Browse filesBrowse files
debadree25juanarbol
authored andcommitted
stream: implement finished() for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46205 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Darshan Sen <raisinten@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 896027c commit cde5960
Copy full SHA for cde5960

File tree

Expand file treeCollapse file tree

5 files changed

+301
-9
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+301
-9
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/end-of-stream.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/end-of-stream.js
+20-5Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,23 @@ const {
2222
validateBoolean
2323
} = require('internal/validators');
2424

25-
const { Promise } = primordials;
25+
const { Promise, PromisePrototypeThen } = primordials;
2626

2727
const {
2828
isClosed,
2929
isReadable,
3030
isReadableNodeStream,
31+
isReadableStream,
3132
isReadableFinished,
3233
isReadableErrored,
3334
isWritable,
3435
isWritableNodeStream,
36+
isWritableStream,
3537
isWritableFinished,
3638
isWritableErrored,
3739
isNodeStream,
3840
willEmitClose: _willEmitClose,
41+
kIsClosedPromise,
3942
} = require('internal/streams/utils');
4043

4144
function isRequest(stream) {
@@ -58,14 +61,17 @@ function eos(stream, options, callback) {
5861

5962
callback = once(callback);
6063

61-
const readable = options.readable ?? isReadableNodeStream(stream);
62-
const writable = options.writable ?? isWritableNodeStream(stream);
64+
if (isReadableStream(stream) || isWritableStream(stream)) {
65+
return eosWeb(stream, options, callback);
66+
}
6367

6468
if (!isNodeStream(stream)) {
65-
// TODO: Webstreams.
66-
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
69+
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
6770
}
6871

72+
const readable = options.readable ?? isReadableNodeStream(stream);
73+
const writable = options.writable ?? isWritableNodeStream(stream);
74+
6975
const wState = stream._writableState;
7076
const rState = stream._readableState;
7177

@@ -255,6 +261,15 @@ function eos(stream, options, callback) {
255261
return cleanup;
256262
}
257263

264+
function eosWeb(stream, opts, callback) {
265+
PromisePrototypeThen(
266+
stream[kIsClosedPromise].promise,
267+
() => process.nextTick(() => callback.call(stream)),
268+
(err) => process.nextTick(() => callback.call(stream, err)),
269+
);
270+
return nop;
271+
}
272+
258273
function finished(stream, opts) {
259274
let autoCleanup = false;
260275
if (opts === null) {
Collapse file

‎lib/internal/streams/utils.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/utils.js
+25Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ const {
44
Symbol,
55
SymbolAsyncIterator,
66
SymbolIterator,
7+
SymbolFor,
78
} = primordials;
89

910
const kDestroyed = Symbol('kDestroyed');
1011
const kIsErrored = Symbol('kIsErrored');
1112
const kIsReadable = Symbol('kIsReadable');
1213
const kIsDisturbed = Symbol('kIsDisturbed');
1314

15+
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
16+
1417
function isReadableNodeStream(obj, strict = false) {
1518
return !!(
1619
obj &&
@@ -55,6 +58,25 @@ function isNodeStream(obj) {
5558
);
5659
}
5760

61+
function isReadableStream(obj) {
62+
return !!(
63+
obj &&
64+
!isNodeStream(obj) &&
65+
typeof obj.pipeThrough === 'function' &&
66+
typeof obj.getReader === 'function' &&
67+
typeof obj.cancel === 'function'
68+
);
69+
}
70+
71+
function isWritableStream(obj) {
72+
return !!(
73+
obj &&
74+
!isNodeStream(obj) &&
75+
typeof obj.getWriter === 'function' &&
76+
typeof obj.abort === 'function'
77+
);
78+
}
79+
5880
function isIterable(obj, isAsync) {
5981
if (obj == null) return false;
6082
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -269,18 +291,21 @@ module.exports = {
269291
kIsErrored,
270292
isReadable,
271293
kIsReadable,
294+
kIsClosedPromise,
272295
isClosed,
273296
isDestroyed,
274297
isDuplexNodeStream,
275298
isFinished,
276299
isIterable,
277300
isReadableNodeStream,
301+
isReadableStream,
278302
isReadableEnded,
279303
isReadableFinished,
280304
isReadableErrored,
281305
isNodeStream,
282306
isWritable,
283307
isWritableNodeStream,
308+
isWritableStream,
284309
isWritableEnded,
285310
isWritableFinished,
286311
isWritableErrored,
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+11-3Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ const {
8585
kIsDisturbed,
8686
kIsErrored,
8787
kIsReadable,
88+
kIsClosedPromise,
8889
} = require('internal/streams/utils');
8990

9091
const {
@@ -258,9 +259,11 @@ class ReadableStream {
258259
port1: undefined,
259260
port2: undefined,
260261
promise: undefined,
261-
}
262+
},
262263
};
263264

265+
this[kIsClosedPromise] = createDeferredPromise();
266+
264267
// The spec requires handling of the strategy first
265268
// here. Specifically, if getting the size and
266269
// highWaterMark from the strategy fail, that has
@@ -652,8 +655,9 @@ function TransferredReadableStream() {
652655
writable: undefined,
653656
port: undefined,
654657
promise: undefined,
655-
}
658+
},
656659
};
660+
this[kIsClosedPromise] = createDeferredPromise();
657661
},
658662
[], ReadableStream));
659663
}
@@ -1213,8 +1217,9 @@ function createTeeReadableStream(start, pull, cancel) {
12131217
writable: undefined,
12141218
port: undefined,
12151219
promise: undefined,
1216-
}
1220+
},
12171221
};
1222+
this[kIsClosedPromise] = createDeferredPromise();
12181223
setupReadableStreamDefaultControllerFromSource(
12191224
this,
12201225
ObjectCreate(null, {
@@ -1887,6 +1892,7 @@ function readableStreamCancel(stream, reason) {
18871892
function readableStreamClose(stream) {
18881893
assert(stream[kState].state === 'readable');
18891894
stream[kState].state = 'closed';
1895+
stream[kIsClosedPromise].resolve();
18901896

18911897
const {
18921898
reader,
@@ -1908,6 +1914,8 @@ function readableStreamError(stream, error) {
19081914
assert(stream[kState].state === 'readable');
19091915
stream[kState].state = 'errored';
19101916
stream[kState].storedError = error;
1917+
stream[kIsClosedPromise].reject(error);
1918+
setPromiseHandled(stream[kIsClosedPromise].promise);
19111919

19121920
const {
19131921
reader
Collapse file

‎lib/internal/webstreams/writablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/writablestream.js
+13-1Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ const {
6969
kState,
7070
} = require('internal/webstreams/util');
7171

72+
const {
73+
kIsClosedPromise,
74+
} = require('internal/streams/utils');
75+
7276
const {
7377
AbortController,
7478
} = require('internal/abort_controller');
@@ -191,9 +195,11 @@ class WritableStream {
191195
port1: undefined,
192196
port2: undefined,
193197
promise: undefined,
194-
}
198+
},
195199
};
196200

201+
this[kIsClosedPromise] = createDeferredPromise();
202+
197203
const size = extractSizeAlgorithm(strategy?.size);
198204
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
199205

@@ -363,6 +369,7 @@ function TransferredWritableStream() {
363369
readable: undefined,
364370
},
365371
};
372+
this[kIsClosedPromise] = createDeferredPromise();
366373
},
367374
[], WritableStream));
368375
}
@@ -742,6 +749,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
742749
resolve: undefined,
743750
};
744751
}
752+
753+
stream[kIsClosedPromise].reject(stream[kState]?.storedError);
754+
setPromiseHandled(stream[kIsClosedPromise].promise);
755+
745756
const {
746757
writer,
747758
} = stream[kState];
@@ -855,6 +866,7 @@ function writableStreamFinishInFlightClose(stream) {
855866
stream[kState].state = 'closed';
856867
if (stream[kState].writer !== undefined)
857868
stream[kState].writer[kState].close.resolve?.();
869+
stream[kIsClosedPromise].resolve?.();
858870
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
859871
assert(stream[kState].storedError === undefined);
860872
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.