Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e11a079

Browse filesBrowse files
iMosesruyadorno
authored andcommitted
stream: use synchronous error validation & validate abort signal option
made sure top level methods aren't async/generators so that validation errors could be caught synchronously also added validation for the abort signal option PR-URL: #41777 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Nitzan Uziely <linkgoron@gmail.com>
1 parent 4db343b commit e11a079
Copy full SHA for e11a079

File tree

Expand file treeCollapse file tree

8 files changed

+98
-11
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

8 files changed

+98
-11
lines changed
Open diff view settings
Collapse file

‎lib/internal/streams/operators.js‎

Copy file name to clipboardExpand all lines: lib/internal/streams/operators.js
+58-10Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ const {
1010
},
1111
AbortError,
1212
} = require('internal/errors');
13-
const { validateInteger } = require('internal/validators');
13+
const {
14+
validateAbortSignal,
15+
validateInteger,
16+
} = require('internal/validators');
1417
const { kWeakHandler } = require('internal/event_target');
1518
const { finished } = require('internal/streams/end-of-stream');
1619

@@ -33,10 +36,12 @@ function map(fn, options) {
3336
throw new ERR_INVALID_ARG_TYPE(
3437
'fn', ['Function', 'AsyncFunction'], fn);
3538
}
36-
3739
if (options != null && typeof options !== 'object') {
3840
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
3941
}
42+
if (options?.signal != null) {
43+
validateAbortSignal(options.signal, 'options.signal');
44+
}
4045

4146
let concurrency = 1;
4247
if (options?.concurrency != null) {
@@ -161,17 +166,33 @@ function map(fn, options) {
161166
}.call(this);
162167
}
163168

164-
async function* asIndexedPairs(options) {
165-
let index = 0;
166-
for await (const val of this) {
167-
if (options?.signal?.aborted) {
168-
throw new AbortError({ cause: options.signal.reason });
169-
}
170-
yield [index++, val];
169+
function asIndexedPairs(options) {
170+
if (options != null && typeof options !== 'object') {
171+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
172+
}
173+
if (options?.signal != null) {
174+
validateAbortSignal(options.signal, 'options.signal');
171175
}
176+
177+
return async function* asIndexedPairs() {
178+
let index = 0;
179+
for await (const val of this) {
180+
if (options?.signal?.aborted) {
181+
throw new AbortError({ cause: options.signal.reason });
182+
}
183+
yield [index++, val];
184+
}
185+
}.call(this);
172186
}
173187

174188
async function some(fn, options) {
189+
if (options != null && typeof options !== 'object') {
190+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
191+
}
192+
if (options?.signal != null) {
193+
validateAbortSignal(options.signal, 'options.signal');
194+
}
195+
175196
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
176197
// Note that some does short circuit but also closes the iterator if it does
177198
const ac = new AbortController();
@@ -246,6 +267,13 @@ async function reduce(reducer, initialValue, options) {
246267
throw new ERR_INVALID_ARG_TYPE(
247268
'reducer', ['Function', 'AsyncFunction'], reducer);
248269
}
270+
if (options != null && typeof options !== 'object') {
271+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
272+
}
273+
if (options?.signal != null) {
274+
validateAbortSignal(options.signal, 'options.signal');
275+
}
276+
249277
let hasInitialValue = arguments.length > 1;
250278
if (options?.signal?.aborted) {
251279
const err = new AbortError(undefined, { cause: options.signal.reason });
@@ -283,6 +311,13 @@ async function reduce(reducer, initialValue, options) {
283311
}
284312

285313
async function toArray(options) {
314+
if (options != null && typeof options !== 'object') {
315+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
316+
}
317+
if (options?.signal != null) {
318+
validateAbortSignal(options.signal, 'options.signal');
319+
}
320+
286321
const result = [];
287322
for await (const val of this) {
288323
if (options?.signal?.aborted) {
@@ -316,6 +351,13 @@ function toIntegerOrInfinity(number) {
316351
}
317352

318353
function drop(number, options) {
354+
if (options != null && typeof options !== 'object') {
355+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
356+
}
357+
if (options?.signal != null) {
358+
validateAbortSignal(options.signal, 'options.signal');
359+
}
360+
319361
number = toIntegerOrInfinity(number);
320362
return async function* drop() {
321363
if (options?.signal?.aborted) {
@@ -332,8 +374,14 @@ function drop(number, options) {
332374
}.call(this);
333375
}
334376

335-
336377
function take(number, options) {
378+
if (options != null && typeof options !== 'object') {
379+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
380+
}
381+
if (options?.signal != null) {
382+
validateAbortSignal(options.signal, 'options.signal');
383+
}
384+
337385
number = toIntegerOrInfinity(number);
338386
return async function* take() {
339387
if (options?.signal?.aborted) {
Collapse file

‎test/parallel/test-stream-asIndexedPairs.mjs‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-asIndexedPairs.mjs
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import '../common/index.mjs';
22
import { Readable } from 'stream';
3-
import { deepStrictEqual, rejects } from 'assert';
3+
import { deepStrictEqual, rejects, throws } from 'assert';
44

55
{
66
// asIndexedPairs with a synchronous stream
@@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
4545
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
4646
}, /AbortError/);
4747
}
48+
49+
{
50+
// Error cases
51+
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
52+
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
53+
}
Collapse file

‎test/parallel/test-stream-drop-take.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-drop-take.js
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
9393
for (const example of invalidArgs) {
9494
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
9595
}
96+
97+
throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
98+
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
99+
100+
throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
101+
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
96102
}
Collapse file

‎test/parallel/test-stream-flatMap.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-flatMap.js
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function oneTo5() {
114114
concurrency: 'Foo'
115115
}), /ERR_OUT_OF_RANGE/);
116116
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
117+
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
117118
}
118119
{
119120
// Test result is a Readable
Collapse file

‎test/parallel/test-stream-map.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-map.js
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
180180
concurrency: 'Foo'
181181
}), /ERR_OUT_OF_RANGE/);
182182
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183+
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
183184
}
184185
{
185186
// Test result is a Readable
Collapse file

‎test/parallel/test-stream-reduce.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-reduce.js
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ function sum(p, c) {
121121
// Error cases
122122
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123123
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124+
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
125+
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
124126
}
125127

126128
{
Collapse file

‎test/parallel/test-stream-some-every.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-some-every.js
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ function oneTo5Async() {
8787
assert.rejects(async () => {
8888
await Readable.from([1]).every(1);
8989
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
91+
assert.rejects(async () => {
92+
await Readable.from([1]).every((x) => x, 1);
93+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94+
95+
assert.rejects(async () => {
96+
await Readable.from([1]).every((x) => x, {
97+
signal: true
98+
});
99+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
100+
90101
assert.rejects(async () => {
91102
await Readable.from([1]).every((x) => x, {
92103
concurrency: 'Foo'
Collapse file

‎test/parallel/test-stream-toArray.js‎

Copy file name to clipboardExpand all lines: test/parallel/test-stream-toArray.js
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,15 @@ const assert = require('assert');
7979
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
8080
assert.strictEqual(result instanceof Promise, true);
8181
}
82+
{
83+
// Error cases
84+
assert.rejects(async () => {
85+
await Readable.from([1]).toArray(1);
86+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
87+
88+
assert.rejects(async () => {
89+
await Readable.from([1]).toArray({
90+
signal: true
91+
});
92+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
93+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.