Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4fe325d

Browse filesBrowse files
authored
stream: preserve AsyncLocalStorage on finished only when needed
PR-URL: #59873 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Daniel Lemire <daniel@lemire.me> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
1 parent 6176222 commit 4fe325d
Copy full SHA for 4fe325d

5 files changed

+124-3Lines changed: 124 additions & 3 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎benchmark/streams/finished.js‎

Copy file name to clipboard
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable, Writable } = require('stream');
5+
const { finished } = require('stream/promises');
6+
7+
const bench = common.createBenchmark(main, {
8+
n: [1e7],
9+
streamType: ['readable', 'writable'],
10+
});
11+
12+
async function main({ n, streamType }) {
13+
bench.start();
14+
15+
for (let i = 0; i < n; i++) {
16+
let stream;
17+
18+
switch (streamType) {
19+
case 'readable':
20+
stream = new Readable({ read() { this.push(null); } });
21+
stream.resume();
22+
break;
23+
case 'writable':
24+
stream = new Writable({ write(chunk, enc, cb) { cb(); } });
25+
stream.end();
26+
break;
27+
}
28+
29+
await finished(stream);
30+
}
31+
32+
bench.end(n);
33+
}
Collapse file

‎lib/internal/streams/end-of-stream.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/end-of-stream.js
+11-3Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const {
4444
kIsClosedPromise,
4545
} = require('internal/streams/utils');
4646

47+
const { getHookArrays } = require('internal/async_hooks');
48+
const AsyncContextFrame = require('internal/async_context_frame');
49+
4750
// Lazy load
4851
let AsyncResource;
4952
let addAbortListener;
@@ -74,9 +77,14 @@ function eos(stream, options, callback) {
7477
validateFunction(callback, 'callback');
7578
validateAbortSignal(options.signal, 'options.signal');
7679

77-
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
78-
// is a bottleneck here.
79-
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
80+
if (AsyncContextFrame.current() ||
81+
getHookArrays()[0].length > 0) {
82+
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
83+
// is a bottleneck here.
84+
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
85+
} else {
86+
callback = once(callback);
87+
}
8088

8189
if (isReadableStream(stream) || isWritableStream(stream)) {
8290
return eosWeb(stream, options, callback);
Collapse file
+24Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { AsyncLocalStorage } = require('async_hooks');
7+
const { strictEqual } = require('assert');
8+
const AsyncContextFrame = require('internal/async_context_frame');
9+
const internalAsyncHooks = require('internal/async_hooks');
10+
11+
// This test verifies that ALS context is preserved when using stream.finished()
12+
13+
const als = new AsyncLocalStorage();
14+
const readable = new Readable();
15+
16+
als.run('test-context-1', () => {
17+
finished(readable, common.mustCall(() => {
18+
strictEqual(AsyncContextFrame.enabled || internalAsyncHooks.getHookArrays()[0].length > 0,
19+
true, 'One of AsyncContextFrame or async hooks criteria should be met');
20+
strictEqual(als.getStore(), 'test-context-1', 'ALS context should be preserved');
21+
}));
22+
});
23+
24+
readable.destroy();
Collapse file
+35Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { createHook, executionAsyncId } = require('async_hooks');
7+
const { strictEqual } = require('assert');
8+
const internalAsyncHooks = require('internal/async_hooks');
9+
10+
// This test verifies that when there are active async hooks, stream.finished() uses
11+
// the bindAsyncResource path
12+
13+
createHook({
14+
init(asyncId, type, triggerAsyncId) {
15+
if (type === 'STREAM_END_OF_STREAM') {
16+
const parentContext = contextMap.get(triggerAsyncId);
17+
contextMap.set(asyncId, parentContext);
18+
}
19+
}
20+
}).enable();
21+
22+
const contextMap = new Map();
23+
const asyncId = executionAsyncId();
24+
contextMap.set(asyncId, 'abc-123');
25+
const readable = new Readable();
26+
27+
finished(readable, common.mustCall(() => {
28+
const currentAsyncId = executionAsyncId();
29+
const ctx = contextMap.get(currentAsyncId);
30+
strictEqual(internalAsyncHooks.getHookArrays()[0].length > 0,
31+
true, 'Should have active user async hook');
32+
strictEqual(ctx, 'abc-123', 'Context should be preserved');
33+
}));
34+
35+
readable.destroy();
Collapse file
+21Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Flags: --expose-internals --no-async-context-frame
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { strictEqual } = require('assert');
7+
const AsyncContextFrame = require('internal/async_context_frame');
8+
const internalAsyncHooks = require('internal/async_hooks');
9+
10+
// This test verifies that when there are no active async hooks, stream.finished() uses the default callback path
11+
12+
const readable = new Readable();
13+
14+
finished(readable, common.mustCall(() => {
15+
strictEqual(internalAsyncHooks.getHookArrays()[0].length === 0,
16+
true, 'Should not have active user async hook');
17+
strictEqual(AsyncContextFrame.current() || internalAsyncHooks.getHookArrays()[0].length > 0,
18+
false, 'Default callback path should be used');
19+
}));
20+
21+
readable.destroy();

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.