Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7392cd0

Browse filesBrowse files
debadree25MylesBorins
authored andcommitted
stream: add abort signal for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46273 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 4222e58 commit 7392cd0
Copy full SHA for 7392cd0

File tree

Expand file treeCollapse file tree

6 files changed

+233
-12
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+233
-12
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+40-2Lines changed: 40 additions & 2 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -3226,17 +3226,24 @@ readable.getReader().read().then((result) => {
32263226

32273227
<!-- YAML
32283228
added: v15.4.0
3229+
changes:
3230+
- version: REPLACEME
3231+
pr-url: https://github.com/nodejs/node/pull/46273
3232+
description: Added support for `ReadableStream` and
3233+
`WritableStream`.
32293234
-->
32303235

32313236
* `signal` {AbortSignal} A signal representing possible cancellation
3232-
* `stream` {Stream} a stream to attach a signal to
3237+
* `stream` {Stream|ReadableStream|WritableStream}
3238+
3239+
A stream to attach a signal to.
32333240

32343241
Attaches an AbortSignal to a readable or writeable stream. This lets code
32353242
control stream destruction using an `AbortController`.
32363243

32373244
Calling `abort` on the `AbortController` corresponding to the passed
32383245
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
3239-
on the stream.
3246+
on the stream, and `controller.error(new AbortError())` for webstreams.
32403247

32413248
```js
32423249
const fs = require('node:fs');
@@ -3274,6 +3281,37 @@ const stream = addAbortSignal(
32743281
})();
32753282
```
32763283

3284+
Or using an `AbortSignal` with a ReadableStream:
3285+
3286+
```js
3287+
const controller = new AbortController();
3288+
const rs = new ReadableStream({
3289+
start(controller) {
3290+
controller.enqueue('hello');
3291+
controller.enqueue('world');
3292+
controller.close();
3293+
},
3294+
});
3295+
3296+
addAbortSignal(controller.signal, rs);
3297+
3298+
finished(rs, (err) => {
3299+
if (err) {
3300+
if (err.name === 'AbortError') {
3301+
// The operation was cancelled
3302+
}
3303+
}
3304+
});
3305+
3306+
const reader = rs.getReader();
3307+
3308+
reader.read().then(({ value, done }) => {
3309+
console.log(value); // hello
3310+
console.log(done); // false
3311+
controller.abort();
3312+
});
3313+
```
3314+
32773315
## API for stream implementers
32783316

32793317
<!--type=misc-->
Collapse file

‎lib/internal/streams/add-abort-signal.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/add-abort-signal.js
+16-9Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ const {
55
codes,
66
} = require('internal/errors');
77

8+
const {
9+
isNodeStream,
10+
isWebStream,
11+
kControllerErrorFunction,
12+
} = require('internal/streams/utils');
13+
814
const eos = require('internal/streams/end-of-stream');
915
const { ERR_INVALID_ARG_TYPE } = codes;
1016

@@ -18,24 +24,25 @@ const validateAbortSignal = (signal, name) => {
1824
}
1925
};
2026

21-
function isNodeStream(obj) {
22-
return !!(obj && typeof obj.pipe === 'function');
23-
}
24-
2527
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
2628
validateAbortSignal(signal, 'signal');
27-
if (!isNodeStream(stream)) {
28-
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
29+
if (!isNodeStream(stream) && !isWebStream(stream)) {
30+
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
2931
}
3032
return module.exports.addAbortSignalNoValidate(signal, stream);
3133
};
34+
3235
module.exports.addAbortSignalNoValidate = function(signal, stream) {
3336
if (typeof signal !== 'object' || !('aborted' in signal)) {
3437
return stream;
3538
}
36-
const onAbort = () => {
37-
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
38-
};
39+
const onAbort = isNodeStream(stream) ?
40+
() => {
41+
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
42+
} :
43+
() => {
44+
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
45+
};
3946
if (signal.aborted) {
4047
onAbort();
4148
} else {
Collapse file

‎lib/internal/streams/utils.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/utils.js
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable');
1313
const kIsDisturbed = Symbol('kIsDisturbed');
1414

1515
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
16+
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
1617

1718
function isReadableNodeStream(obj, strict = false) {
1819
return !!(
@@ -305,6 +306,7 @@ module.exports = {
305306
isReadable,
306307
kIsReadable,
307308
kIsClosedPromise,
309+
kControllerErrorFunction,
308310
isClosed,
309311
isDestroyed,
310312
isDuplexNodeStream,
Collapse file

‎lib/internal/webstreams/readablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/readablestream.js
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ const {
8383
kIsErrored,
8484
kIsReadable,
8585
kIsClosedPromise,
86+
kControllerErrorFunction,
8687
} = require('internal/streams/utils');
8788

8889
const {
@@ -260,6 +261,7 @@ class ReadableStream {
260261
};
261262

262263
this[kIsClosedPromise] = createDeferredPromise();
264+
this[kControllerErrorFunction] = () => {};
263265

264266
// The spec requires handling of the strategy first
265267
// here. Specifically, if getting the size and
@@ -1891,7 +1893,6 @@ function readableStreamClose(stream) {
18911893
assert(stream[kState].state === 'readable');
18921894
stream[kState].state = 'closed';
18931895
stream[kIsClosedPromise].resolve();
1894-
18951896
const {
18961897
reader,
18971898
} = stream[kState];
@@ -2330,6 +2331,7 @@ function setupReadableStreamDefaultController(
23302331
stream,
23312332
};
23322333
stream[kState].controller = controller;
2334+
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
23332335

23342336
const startResult = startAlgorithm();
23352337

Collapse file

‎lib/internal/webstreams/writablestream.js‎

Copy file name to clipboardExpand all lines: lib/internal/webstreams/writablestream.js
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const {
7171

7272
const {
7373
kIsClosedPromise,
74+
kControllerErrorFunction,
7475
} = require('internal/streams/utils');
7576

7677
const {
@@ -199,6 +200,7 @@ class WritableStream {
199200
};
200201

201202
this[kIsClosedPromise] = createDeferredPromise();
203+
this[kControllerErrorFunction] = () => {};
202204

203205
const size = extractSizeAlgorithm(strategy?.size);
204206
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
@@ -370,6 +372,7 @@ function TransferredWritableStream() {
370372
},
371373
};
372374
this[kIsClosedPromise] = createDeferredPromise();
375+
this[kControllerErrorFunction] = () => {};
373376
},
374377
[], WritableStream));
375378
}
@@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController(
12821285
writeAlgorithm,
12831286
};
12841287
stream[kState].controller = controller;
1288+
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
12851289

12861290
writableStreamUpdateBackpressure(
12871291
stream,
Collapse file
+168Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { finished, addAbortSignal } = require('stream');
5+
const { ReadableStream, WritableStream } = require('stream/web');
6+
const assert = require('assert');
7+
8+
function createTestReadableStream() {
9+
return new ReadableStream({
10+
start(controller) {
11+
controller.enqueue('a');
12+
controller.enqueue('b');
13+
controller.enqueue('c');
14+
controller.close();
15+
}
16+
});
17+
}
18+
19+
function createTestWritableStream(values) {
20+
return new WritableStream({
21+
write(chunk) {
22+
values.push(chunk);
23+
}
24+
});
25+
}
26+
27+
{
28+
const rs = createTestReadableStream();
29+
30+
const reader = rs.getReader();
31+
32+
const ac = new AbortController();
33+
34+
addAbortSignal(ac.signal, rs);
35+
36+
finished(rs, common.mustCall((err) => {
37+
assert.strictEqual(err.name, 'AbortError');
38+
assert.rejects(reader.read(), /AbortError/).then(common.mustCall());
39+
assert.rejects(reader.closed, /AbortError/).then(common.mustCall());
40+
}));
41+
42+
reader.read().then(common.mustCall((result) => {
43+
assert.strictEqual(result.value, 'a');
44+
ac.abort();
45+
}));
46+
}
47+
48+
{
49+
const rs = createTestReadableStream();
50+
51+
const ac = new AbortController();
52+
53+
addAbortSignal(ac.signal, rs);
54+
55+
assert.rejects((async () => {
56+
for await (const chunk of rs) {
57+
if (chunk === 'b') {
58+
ac.abort();
59+
}
60+
}
61+
})(), /AbortError/).then(common.mustCall());
62+
}
63+
64+
{
65+
const rs1 = createTestReadableStream();
66+
67+
const rs2 = createTestReadableStream();
68+
69+
const ac = new AbortController();
70+
71+
addAbortSignal(ac.signal, rs1);
72+
addAbortSignal(ac.signal, rs2);
73+
74+
const reader1 = rs1.getReader();
75+
const reader2 = rs2.getReader();
76+
77+
finished(rs1, common.mustCall((err) => {
78+
assert.strictEqual(err.name, 'AbortError');
79+
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
80+
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
81+
}));
82+
83+
finished(rs2, common.mustCall((err) => {
84+
assert.strictEqual(err.name, 'AbortError');
85+
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
86+
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
87+
}));
88+
89+
ac.abort();
90+
}
91+
92+
{
93+
const rs = createTestReadableStream();
94+
95+
const { 0: rs1, 1: rs2 } = rs.tee();
96+
97+
const ac = new AbortController();
98+
99+
addAbortSignal(ac.signal, rs);
100+
101+
const reader1 = rs1.getReader();
102+
const reader2 = rs2.getReader();
103+
104+
finished(rs1, common.mustCall((err) => {
105+
assert.strictEqual(err.name, 'AbortError');
106+
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());
107+
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());
108+
}));
109+
110+
finished(rs2, common.mustCall((err) => {
111+
assert.strictEqual(err.name, 'AbortError');
112+
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());
113+
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());
114+
}));
115+
116+
ac.abort();
117+
}
118+
119+
{
120+
const values = [];
121+
const ws = createTestWritableStream(values);
122+
123+
const ac = new AbortController();
124+
125+
addAbortSignal(ac.signal, ws);
126+
127+
const writer = ws.getWriter();
128+
129+
finished(ws, common.mustCall((err) => {
130+
assert.strictEqual(err.name, 'AbortError');
131+
assert.deepStrictEqual(values, ['a']);
132+
assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall());
133+
assert.rejects(writer.closed, /AbortError/).then(common.mustCall());
134+
}));
135+
136+
writer.write('a').then(() => {
137+
ac.abort();
138+
});
139+
}
140+
141+
{
142+
const values = [];
143+
144+
const ws1 = createTestWritableStream(values);
145+
const ws2 = createTestWritableStream(values);
146+
147+
const ac = new AbortController();
148+
149+
addAbortSignal(ac.signal, ws1);
150+
addAbortSignal(ac.signal, ws2);
151+
152+
const writer1 = ws1.getWriter();
153+
const writer2 = ws2.getWriter();
154+
155+
finished(ws1, common.mustCall((err) => {
156+
assert.strictEqual(err.name, 'AbortError');
157+
assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall());
158+
assert.rejects(writer1.closed, /AbortError/).then(common.mustCall());
159+
}));
160+
161+
finished(ws2, common.mustCall((err) => {
162+
assert.strictEqual(err.name, 'AbortError');
163+
assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall());
164+
assert.rejects(writer2.closed, /AbortError/).then(common.mustCall());
165+
}));
166+
167+
ac.abort();
168+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.