Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b6d09e8

Browse filesBrowse files
mcollinatargos
authored andcommitted
events: add EventEmitter.on to async iterate over events
Fixes: #27847 PR-URL: #27994 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 1d45ba3 commit b6d09e8
Copy full SHA for b6d09e8

File tree

Expand file treeCollapse file tree

3 files changed

+362
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+362
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/events.md‎

Copy file name to clipboardExpand all lines: doc/api/events.md
+35Lines changed: 35 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')`
886886

887887
See how to write a custom [rejection handler][rejection].
888888

889+
## events.on(emitter, eventName)
890+
<!-- YAML
891+
added: REPLACEME
892+
-->
893+
894+
* `emitter` {EventEmitter}
895+
* `eventName` {string|symbol} The name of the event being listened for
896+
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
897+
898+
```js
899+
const { on, EventEmitter } = require('events');
900+
901+
(async () => {
902+
const ee = new EventEmitter();
903+
904+
// Emit later on
905+
process.nextTick(() => {
906+
ee.emit('foo', 'bar');
907+
ee.emit('foo', 42);
908+
});
909+
910+
for await (const event of on(ee, 'foo')) {
911+
// The execution of this inner block is synchronous and it
912+
// processes one event at a time (even with await). Do not use
913+
// if concurrent execution is required.
914+
console.log(event); // prints ['bar'] [42]
915+
}
916+
})();
917+
```
918+
919+
Returns an `AsyncIterator` that iterates `eventName` events. It will throw
920+
if the `EventEmitter` emits `'error'`. It removes all listeners when
921+
exiting the loop. The `value` returned by each iteration is an array
922+
composed of the emitted event arguments.
923+
889924
[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
890925
[`--trace-warnings`]: cli.html#cli_trace_warnings
891926
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners
Collapse file

‎lib/events.js‎

Copy file name to clipboardExpand all lines: lib/events.js
+104Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ const {
3030
ObjectCreate,
3131
ObjectDefineProperty,
3232
ObjectGetPrototypeOf,
33+
ObjectSetPrototypeOf,
3334
ObjectKeys,
3435
Promise,
36+
PromiseReject,
37+
PromiseResolve,
3538
ReflectApply,
3639
ReflectOwnKeys,
3740
Symbol,
3841
SymbolFor,
42+
SymbolAsyncIterator
3943
} = primordials;
4044
const kRejection = SymbolFor('nodejs.rejection');
4145

@@ -63,6 +67,7 @@ function EventEmitter(opts) {
6367
}
6468
module.exports = EventEmitter;
6569
module.exports.once = once;
70+
module.exports.on = on;
6671

6772
// Backwards-compat with node 0.10.x
6873
EventEmitter.EventEmitter = EventEmitter;
@@ -658,3 +663,102 @@ function once(emitter, name) {
658663
emitter.once(name, eventListener);
659664
});
660665
}
666+
667+
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
668+
ObjectGetPrototypeOf(async function* () {}).prototype);
669+
670+
function createIterResult(value, done) {
671+
return { value, done };
672+
}
673+
674+
function on(emitter, event) {
675+
const unconsumedEvents = [];
676+
const unconsumedPromises = [];
677+
let error = null;
678+
let finished = false;
679+
680+
const iterator = ObjectSetPrototypeOf({
681+
next() {
682+
// First, we consume all unread events
683+
const value = unconsumedEvents.shift();
684+
if (value) {
685+
return PromiseResolve(createIterResult(value, false));
686+
}
687+
688+
// Then we error, if an error happened
689+
// This happens one time if at all, because after 'error'
690+
// we stop listening
691+
if (error) {
692+
const p = PromiseReject(error);
693+
// Only the first element errors
694+
error = null;
695+
return p;
696+
}
697+
698+
// If the iterator is finished, resolve to done
699+
if (finished) {
700+
return PromiseResolve(createIterResult(undefined, true));
701+
}
702+
703+
// Wait until an event happens
704+
return new Promise(function(resolve, reject) {
705+
unconsumedPromises.push({ resolve, reject });
706+
});
707+
},
708+
709+
return() {
710+
emitter.removeListener(event, eventHandler);
711+
emitter.removeListener('error', errorHandler);
712+
finished = true;
713+
714+
for (const promise of unconsumedPromises) {
715+
promise.resolve(createIterResult(undefined, true));
716+
}
717+
718+
return PromiseResolve(createIterResult(undefined, true));
719+
},
720+
721+
throw(err) {
722+
if (!err || !(err instanceof Error)) {
723+
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
724+
'Error', err);
725+
}
726+
error = err;
727+
emitter.removeListener(event, eventHandler);
728+
emitter.removeListener('error', errorHandler);
729+
},
730+
731+
[SymbolAsyncIterator]() {
732+
return this;
733+
}
734+
}, AsyncIteratorPrototype);
735+
736+
emitter.on(event, eventHandler);
737+
emitter.on('error', errorHandler);
738+
739+
return iterator;
740+
741+
function eventHandler(...args) {
742+
const promise = unconsumedPromises.shift();
743+
if (promise) {
744+
promise.resolve(createIterResult(args, false));
745+
} else {
746+
unconsumedEvents.push(args);
747+
}
748+
}
749+
750+
function errorHandler(err) {
751+
finished = true;
752+
753+
const toError = unconsumedPromises.shift();
754+
755+
if (toError) {
756+
toError.reject(err);
757+
} else {
758+
// The next time we call next()
759+
error = err;
760+
}
761+
762+
iterator.return();
763+
}
764+
}
Collapse file
+223Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { on, EventEmitter } = require('events');
6+
7+
async function basic() {
8+
const ee = new EventEmitter();
9+
process.nextTick(() => {
10+
ee.emit('foo', 'bar');
11+
// 'bar' is a spurious event, we are testing
12+
// that it does not show up in the iterable
13+
ee.emit('bar', 24);
14+
ee.emit('foo', 42);
15+
});
16+
17+
const iterable = on(ee, 'foo');
18+
19+
const expected = [['bar'], [42]];
20+
21+
for await (const event of iterable) {
22+
const current = expected.shift();
23+
24+
assert.deepStrictEqual(current, event);
25+
26+
if (expected.length === 0) {
27+
break;
28+
}
29+
}
30+
assert.strictEqual(ee.listenerCount('foo'), 0);
31+
assert.strictEqual(ee.listenerCount('error'), 0);
32+
}
33+
34+
async function error() {
35+
const ee = new EventEmitter();
36+
const _err = new Error('kaboom');
37+
process.nextTick(() => {
38+
ee.emit('error', _err);
39+
});
40+
41+
const iterable = on(ee, 'foo');
42+
let looped = false;
43+
let thrown = false;
44+
45+
try {
46+
// eslint-disable-next-line no-unused-vars
47+
for await (const event of iterable) {
48+
looped = true;
49+
}
50+
} catch (err) {
51+
thrown = true;
52+
assert.strictEqual(err, _err);
53+
}
54+
assert.strictEqual(thrown, true);
55+
assert.strictEqual(looped, false);
56+
}
57+
58+
async function errorDelayed() {
59+
const ee = new EventEmitter();
60+
const _err = new Error('kaboom');
61+
process.nextTick(() => {
62+
ee.emit('foo', 42);
63+
ee.emit('error', _err);
64+
});
65+
66+
const iterable = on(ee, 'foo');
67+
const expected = [[42]];
68+
let thrown = false;
69+
70+
try {
71+
for await (const event of iterable) {
72+
const current = expected.shift();
73+
assert.deepStrictEqual(current, event);
74+
}
75+
} catch (err) {
76+
thrown = true;
77+
assert.strictEqual(err, _err);
78+
}
79+
assert.strictEqual(thrown, true);
80+
assert.strictEqual(ee.listenerCount('foo'), 0);
81+
assert.strictEqual(ee.listenerCount('error'), 0);
82+
}
83+
84+
async function throwInLoop() {
85+
const ee = new EventEmitter();
86+
const _err = new Error('kaboom');
87+
88+
process.nextTick(() => {
89+
ee.emit('foo', 42);
90+
});
91+
92+
try {
93+
for await (const event of on(ee, 'foo')) {
94+
assert.deepStrictEqual(event, [42]);
95+
throw _err;
96+
}
97+
} catch (err) {
98+
assert.strictEqual(err, _err);
99+
}
100+
101+
assert.strictEqual(ee.listenerCount('foo'), 0);
102+
assert.strictEqual(ee.listenerCount('error'), 0);
103+
}
104+
105+
async function next() {
106+
const ee = new EventEmitter();
107+
const iterable = on(ee, 'foo');
108+
109+
process.nextTick(function() {
110+
ee.emit('foo', 'bar');
111+
ee.emit('foo', 42);
112+
iterable.return();
113+
});
114+
115+
const results = await Promise.all([
116+
iterable.next(),
117+
iterable.next(),
118+
iterable.next()
119+
]);
120+
121+
assert.deepStrictEqual(results, [{
122+
value: ['bar'],
123+
done: false
124+
}, {
125+
value: [42],
126+
done: false
127+
}, {
128+
value: undefined,
129+
done: true
130+
}]);
131+
132+
assert.deepStrictEqual(await iterable.next(), {
133+
value: undefined,
134+
done: true
135+
});
136+
}
137+
138+
async function nextError() {
139+
const ee = new EventEmitter();
140+
const iterable = on(ee, 'foo');
141+
const _err = new Error('kaboom');
142+
process.nextTick(function() {
143+
ee.emit('error', _err);
144+
});
145+
const results = await Promise.allSettled([
146+
iterable.next(),
147+
iterable.next(),
148+
iterable.next()
149+
]);
150+
assert.deepStrictEqual(results, [{
151+
status: 'rejected',
152+
reason: _err
153+
}, {
154+
status: 'fulfilled',
155+
value: {
156+
value: undefined,
157+
done: true
158+
}
159+
}, {
160+
status: 'fulfilled',
161+
value: {
162+
value: undefined,
163+
done: true
164+
}
165+
}]);
166+
assert.strictEqual(ee.listeners('error').length, 0);
167+
}
168+
169+
async function iterableThrow() {
170+
const ee = new EventEmitter();
171+
const iterable = on(ee, 'foo');
172+
173+
process.nextTick(() => {
174+
ee.emit('foo', 'bar');
175+
ee.emit('foo', 42); // lost in the queue
176+
iterable.throw(_err);
177+
});
178+
179+
const _err = new Error('kaboom');
180+
let thrown = false;
181+
182+
assert.throws(() => {
183+
// No argument
184+
iterable.throw();
185+
}, {
186+
message: 'The "EventEmitter.AsyncIterator" property must be' +
187+
' an instance of Error. Received undefined',
188+
name: 'TypeError'
189+
});
190+
191+
const expected = [['bar'], [42]];
192+
193+
try {
194+
for await (const event of iterable) {
195+
assert.deepStrictEqual(event, expected.shift());
196+
}
197+
} catch (err) {
198+
thrown = true;
199+
assert.strictEqual(err, _err);
200+
}
201+
assert.strictEqual(thrown, true);
202+
assert.strictEqual(expected.length, 0);
203+
assert.strictEqual(ee.listenerCount('foo'), 0);
204+
assert.strictEqual(ee.listenerCount('error'), 0);
205+
}
206+
207+
async function run() {
208+
const funcs = [
209+
basic,
210+
error,
211+
errorDelayed,
212+
throwInLoop,
213+
next,
214+
nextError,
215+
iterableThrow
216+
];
217+
218+
for (const fn of funcs) {
219+
await fn();
220+
}
221+
}
222+
223+
run().then(common.mustCall());

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.