Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 97f3072

Browse filesBrowse files
ronagrichardlau
authored andcommitted
stream: add signal support to pipeline generators
Generators in pipeline must be able to be aborted or pipeline can deadlock. PR-URL: #39067 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 450da99 commit 97f3072
Copy full SHA for 97f3072

File tree

Expand file treeCollapse file tree

7 files changed

+117
-33
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+117
-33
lines changed
Open diff view settings
Collapse file

‎doc/api/stream.md‎

Copy file name to clipboardExpand all lines: doc/api/stream.md
+44-7Lines changed: 44 additions & 7 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1886,16 +1886,14 @@ const { pipeline } = require('stream/promises');
18861886

18871887
async function run() {
18881888
const ac = new AbortController();
1889-
const options = {
1890-
signal: ac.signal,
1891-
};
1889+
const signal = ac.signal;
18921890

18931891
setTimeout(() => ac.abort(), 1);
18941892
await pipeline(
18951893
fs.createReadStream('archive.tar'),
18961894
zlib.createGzip(),
18971895
fs.createWriteStream('archive.tar.gz'),
1898-
options,
1896+
{ signal },
18991897
);
19001898
}
19011899

@@ -1911,10 +1909,10 @@ const fs = require('fs');
19111909
async function run() {
19121910
await pipeline(
19131911
fs.createReadStream('lowercase.txt'),
1914-
async function* (source) {
1912+
async function* (source, signal) {
19151913
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
19161914
for await (const chunk of source) {
1917-
yield chunk.toUpperCase();
1915+
yield await processChunk(chunk, { signal });
19181916
}
19191917
},
19201918
fs.createWriteStream('uppercase.txt')
@@ -1925,6 +1923,28 @@ async function run() {
19251923
run().catch(console.error);
19261924
```
19271925

1926+
Remember to handle the `signal` argument passed into the async generator.
1927+
Especially in the case where the async generator is the source for the
1928+
pipeline (i.e. first argument) or the pipeline will never complete.
1929+
1930+
```js
1931+
const { pipeline } = require('stream/promises');
1932+
const fs = require('fs');
1933+
1934+
async function run() {
1935+
await pipeline(
1936+
async function * (signal) {
1937+
await someLongRunningfn({ signal });
1938+
yield 'asd';
1939+
},
1940+
fs.createWriteStream('uppercase.txt')
1941+
);
1942+
console.log('Pipeline succeeded.');
1943+
}
1944+
1945+
run().catch(console.error);
1946+
```
1947+
19281948
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
19291949
* `Readable` streams which have emitted `'end'` or `'close'`.
19301950
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -3342,13 +3362,20 @@ the `Readable.from()` utility method:
33423362
```js
33433363
const { Readable } = require('stream');
33443364

3365+
const ac = new AbortController();
3366+
const signal = ac.signal;
3367+
33453368
async function * generate() {
33463369
yield 'a';
3370+
await someLongRunningFn({ signal });
33473371
yield 'b';
33483372
yield 'c';
33493373
}
33503374

33513375
const readable = Readable.from(generate());
3376+
readable.on('close', () => {
3377+
ac.abort();
3378+
});
33523379

33533380
readable.on('data', (chunk) => {
33543381
console.log(chunk);
@@ -3368,21 +3395,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
33683395

33693396
const writable = fs.createWriteStream('./file');
33703397

3398+
const ac = new AbortController();
3399+
const signal = ac.signal;
3400+
3401+
const iterator = createIterator({ signal });
3402+
33713403
// Callback Pattern
33723404
pipeline(iterator, writable, (err, value) => {
33733405
if (err) {
33743406
console.error(err);
33753407
} else {
33763408
console.log(value, 'value returned');
33773409
}
3410+
}).on('close', () => {
3411+
ac.abort();
33783412
});
33793413

33803414
// Promise Pattern
33813415
pipelinePromise(iterator, writable)
33823416
.then((value) => {
33833417
console.log(value, 'value returned');
33843418
})
3385-
.catch(console.error);
3419+
.catch((err) => {
3420+
console.error(err);
3421+
ac.abort();
3422+
});
33863423
```
33873424

33883425
<!--type=misc-->
Collapse file

‎lib/internal/streams/compose.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/compose.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const pipeline = require('internal/streams/pipeline');
3+
const { pipeline } = require('internal/streams/pipeline');
44
const Duplex = require('internal/streams/duplex');
55
const { destroyer } = require('internal/streams/destroy');
66
const {
Collapse file

‎lib/internal/streams/duplexify.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/duplexify.js
+14-4Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const from = require('internal/streams/from');
2626
const {
2727
isBlob,
2828
} = require('internal/blob');
29+
const { AbortController } = require('internal/abort_controller');
2930

3031
const {
3132
FunctionPrototypeCall
@@ -81,14 +82,15 @@ module.exports = function duplexify(body, name) {
8182
// }
8283

8384
if (typeof body === 'function') {
84-
const { value, write, final } = fromAsyncGen(body);
85+
const { value, write, final, destroy } = fromAsyncGen(body);
8586

8687
if (isIterable(value)) {
8788
return from(Duplexify, value, {
8889
// TODO (ronag): highWaterMark?
8990
objectMode: true,
9091
write,
91-
final
92+
final,
93+
destroy
9294
});
9395
}
9496

@@ -123,7 +125,8 @@ module.exports = function duplexify(body, name) {
123125
process.nextTick(cb, err);
124126
}
125127
});
126-
}
128+
},
129+
destroy
127130
});
128131
}
129132

@@ -202,15 +205,18 @@ module.exports = function duplexify(body, name) {
202205

203206
function fromAsyncGen(fn) {
204207
let { promise, resolve } = createDeferredPromise();
208+
const ac = new AbortController();
209+
const signal = ac.signal;
205210
const value = fn(async function*() {
206211
while (true) {
207212
const { chunk, done, cb } = await promise;
208213
process.nextTick(cb);
209214
if (done) return;
215+
if (signal.aborted) throw new AbortError();
210216
yield chunk;
211217
({ promise, resolve } = createDeferredPromise());
212218
}
213-
}());
219+
}(), { signal });
214220

215221
return {
216222
value,
@@ -219,6 +225,10 @@ function fromAsyncGen(fn) {
219225
},
220226
final(cb) {
221227
resolve({ done: true, cb });
228+
},
229+
destroy(err, cb) {
230+
ac.abort();
231+
cb(err);
222232
}
223233
};
224234
}
Collapse file

‎lib/internal/streams/pipeline.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/pipeline.js
+35-5Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@ const {
2121
ERR_MISSING_ARGS,
2222
ERR_STREAM_DESTROYED,
2323
},
24+
AbortError,
2425
} = require('internal/errors');
2526

26-
const { validateCallback } = require('internal/validators');
27+
const {
28+
validateCallback,
29+
validateAbortSignal
30+
} = require('internal/validators');
2731

2832
const {
2933
isIterable,
3034
isReadableNodeStream,
3135
isNodeStream,
3236
} = require('internal/streams/utils');
37+
const { AbortController } = require('internal/abort_controller');
3338

3439
let PassThrough;
3540
let Readable;
@@ -168,19 +173,37 @@ function pipeline(...streams) {
168173
streams = streams[0];
169174
}
170175

176+
return pipelineImpl(streams, callback);
177+
}
178+
179+
function pipelineImpl(streams, callback, opts) {
171180
if (streams.length < 2) {
172181
throw new ERR_MISSING_ARGS('streams');
173182
}
174183

184+
const ac = new AbortController();
185+
const signal = ac.signal;
186+
const outerSignal = opts?.signal;
187+
188+
validateAbortSignal(outerSignal, 'options.signal');
189+
190+
function abort() {
191+
finishImpl(new AbortError());
192+
}
193+
194+
outerSignal?.addEventListener('abort', abort);
195+
175196
let error;
176197
let value;
177198
const destroys = [];
178199

179200
let finishCount = 0;
180201

181202
function finish(err) {
182-
const final = --finishCount === 0;
203+
finishImpl(err, --finishCount === 0);
204+
}
183205

206+
function finishImpl(err, final) {
184207
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
185208
error = err;
186209
}
@@ -193,6 +216,9 @@ function pipeline(...streams) {
193216
destroys.shift()(error);
194217
}
195218

219+
outerSignal?.removeEventListener('abort', abort);
220+
ac.abort();
221+
196222
if (final) {
197223
callback(error, value);
198224
}
@@ -211,7 +237,7 @@ function pipeline(...streams) {
211237

212238
if (i === 0) {
213239
if (typeof stream === 'function') {
214-
ret = stream();
240+
ret = stream({ signal });
215241
if (!isIterable(ret)) {
216242
throw new ERR_INVALID_RETURN_VALUE(
217243
'Iterable, AsyncIterable or Stream', 'source', ret);
@@ -223,7 +249,7 @@ function pipeline(...streams) {
223249
}
224250
} else if (typeof stream === 'function') {
225251
ret = makeAsyncIterable(ret);
226-
ret = stream(ret);
252+
ret = stream(ret, { signal });
227253

228254
if (reading) {
229255
if (!isIterable(ret, true)) {
@@ -291,7 +317,11 @@ function pipeline(...streams) {
291317
}
292318
}
293319

320+
if (signal?.aborted || outerSignal?.aborted) {
321+
process.nextTick(abort);
322+
}
323+
294324
return ret;
295325
}
296326

297-
module.exports = pipeline;
327+
module.exports = { pipelineImpl, pipeline };
Collapse file

‎lib/stream.js‎

Copy file name to clipboardExpand all lines: lib/stream.js
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ const {
2929
promisify: { custom: customPromisify },
3030
} = require('internal/util');
3131

32-
const pipeline = require('internal/streams/pipeline');
3332
const compose = require('internal/streams/compose');
33+
const { pipeline } = require('internal/streams/pipeline');
3434
const { destroyer } = require('internal/streams/destroy');
3535
const eos = require('internal/streams/end-of-stream');
3636
const internalBuffer = require('internal/buffer');
Collapse file

‎lib/stream/promises.js‎

Copy file name to clipboardExpand all lines: lib/stream/promises.js
+3-15Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,12 @@ const {
55
Promise,
66
} = primordials;
77

8-
const {
9-
addAbortSignalNoValidate,
10-
} = require('internal/streams/add-abort-signal');
11-
12-
const {
13-
validateAbortSignal,
14-
} = require('internal/validators');
15-
168
const {
179
isIterable,
1810
isNodeStream,
1911
} = require('internal/streams/utils');
2012

21-
const pl = require('internal/streams/pipeline');
13+
const { pipelineImpl: pl } = require('internal/streams/pipeline');
2214
const eos = require('internal/streams/end-of-stream');
2315

2416
function pipeline(...streams) {
@@ -29,19 +21,15 @@ function pipeline(...streams) {
2921
!isNodeStream(lastArg) && !isIterable(lastArg)) {
3022
const options = ArrayPrototypePop(streams);
3123
signal = options.signal;
32-
validateAbortSignal(signal, 'options.signal');
3324
}
3425

35-
const pipe = pl(...streams, (err, value) => {
26+
pl(streams, (err, value) => {
3627
if (err) {
3728
reject(err);
3829
} else {
3930
resolve(value);
4031
}
41-
});
42-
if (signal) {
43-
addAbortSignalNoValidate(signal, pipe);
44-
}
32+
}, { signal });
4533
});
4634
}
4735

Collapse file

‎test/parallel/test-stream-pipeline.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-pipeline.js
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ const {
1111
Duplex,
1212
addAbortSignal,
1313
} = require('stream');
14+
const pipelinep = require('stream/promises').pipeline;
1415
const assert = require('assert');
1516
const http = require('http');
1617
const { promisify } = require('util');
1718
const net = require('net');
19+
const tsp = require('timers/promises');
1820

1921
{
2022
let finished = false;
@@ -1387,3 +1389,20 @@ const net = require('net');
13871389
assert.strictEqual(res, content);
13881390
}));
13891391
}
1392+
1393+
{
1394+
const ac = new AbortController();
1395+
const signal = ac.signal;
1396+
pipelinep(
1397+
async function * ({ signal }) {
1398+
await tsp.setTimeout(1e6, signal);
1399+
},
1400+
async function(source) {
1401+
1402+
},
1403+
{ signal }
1404+
).catch(common.mustCall((err) => {
1405+
assert.strictEqual(err.name, 'AbortError');
1406+
}));
1407+
ac.abort();
1408+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.