@@ -225,11 +225,76 @@ def _invoke(self, fn, *args):
225
225
return result
226
226
227
227
228
+ UNSPECIFIED = object ()
229
+ try :
230
+ from sys import get_asyncgen_hooks , set_asyncgen_hooks
231
+
232
+ except ImportError :
233
+ import threading
234
+
235
+ asyncgen_hooks = collections .namedtuple (
236
+ "asyncgen_hooks" , ("firstiter" , "finalizer" )
237
+ )
238
+
239
+ class _hooks_storage (threading .local ):
240
+ def __init__ (self ):
241
+ self .firstiter = None
242
+ self .finalizer = None
243
+
244
+ _hooks = _hooks_storage ()
245
+
246
+ def get_asyncgen_hooks ():
247
+ return asyncgen_hooks (
248
+ firstiter = _hooks .firstiter , finalizer = _hooks .finalizer
249
+ )
250
+
251
+ def set_asyncgen_hooks (firstiter = UNSPECIFIED , finalizer = UNSPECIFIED ):
252
+ if firstiter is not UNSPECIFIED :
253
+ if firstiter is None or callable (firstiter ):
254
+ _hooks .firstiter = firstiter
255
+ else :
256
+ raise TypeError (
257
+ "callable firstiter expected, got {}" .format (
258
+ type (firstiter ).__name__
259
+ )
260
+ )
261
+
262
+ if finalizer is not UNSPECIFIED :
263
+ if finalizer is None or callable (finalizer ):
264
+ _hooks .finalizer = finalizer
265
+ else :
266
+ raise TypeError (
267
+ "callable finalizer expected, got {}" .format (
268
+ type (finalizer ).__name__
269
+ )
270
+ )
271
+
272
+
228
273
class AsyncGenerator :
274
+ # https://bitbucket.org/pypy/pypy/issues/2786:
275
+ # PyPy implements 'await' in a way that requires the frame object
276
+ # used to execute a coroutine to keep a weakref to that coroutine.
277
+ # During a GC pass, weakrefs to all doomed objects are broken
278
+ # before any of the doomed objects' finalizers are invoked.
279
+ # If an AsyncGenerator is unreachable, its _coroutine probably
280
+ # is too, and the weakref from ag._coroutine.cr_frame to
281
+ # ag._coroutine will be broken before ag.__del__ can do its
282
+ # one-turn close attempt or can schedule a full aclose() using
283
+ # the registered finalization hook. It doesn't look like the
284
+ # underlying issue is likely to be fully fixed anytime soon,
285
+ # so we work around it by preventing an AsyncGenerator and
286
+ # its _coroutine from being considered newly unreachable at
287
+ # the same time if the AsyncGenerator's finalizer might want
288
+ # to iterate the coroutine some more.
289
+ _pypy_issue2786_workaround = set ()
290
+
229
291
def __init__ (self , coroutine ):
230
292
self ._coroutine = coroutine
231
293
self ._it = coroutine .__await__ ()
232
294
self .ag_running = False
295
+ self ._finalizer = None
296
+ self ._closed = False
297
+ self ._hooks_inited = False
233
298
234
299
# On python 3.5.0 and 3.5.1, __aiter__ must be awaitable.
235
300
# Starting in 3.5.2, it should not be awaitable, and if it is, then it
@@ -263,66 +328,104 @@ def ag_frame(self):
263
328
# Core functionality
264
329
################################################################
265
330
266
- # We make these async functions and use await, rather than just regular
267
- # functions that pass back awaitables, in order to get more useful
268
- # tracebacks when debugging.
331
+ # These need to return awaitables, rather than being async functions,
332
+ # to match the native behavior where the firstiter hook is called
333
+ # immediately on asend()/etc, even if the coroutine that asend()
334
+ # produces isn't awaited for a bit.
335
+
336
+ def __anext__ (self ):
337
+ return self ._do_it (self ._it .__next__ )
269
338
270
- async def __anext__ (self ):
271
- return await self ._do_it (self ._it .__next__ )
339
+ def asend (self , value ):
340
+ return self ._do_it (self ._it .send , value )
272
341
273
- async def asend (self , value ):
274
- return await self ._do_it (self ._it .send , value )
342
+ def athrow (self , type , value = None , traceback = None ):
343
+ return self ._do_it (self ._it .throw , type , value , traceback )
275
344
276
- async def athrow (self , type , value = None , traceback = None ):
277
- return await self ._do_it (self ._it .throw , type , value , traceback )
345
+ def _do_it (self , start_fn , * args ):
346
+ if not self ._hooks_inited :
347
+ self ._hooks_inited = True
348
+ (firstiter , self ._finalizer ) = get_asyncgen_hooks ()
349
+ if firstiter is not None :
350
+ firstiter (self )
351
+ if sys .implementation .name == "pypy" :
352
+ self ._pypy_issue2786_workaround .add (self ._coroutine )
278
353
279
- async def _do_it (self , start_fn , * args ):
280
354
# On CPython 3.5.2 (but not 3.5.0), coroutines get cranky if you try
281
355
# to iterate them after they're exhausted. Generators OTOH just raise
282
356
# StopIteration. We want to convert the one into the other, so we need
283
357
# to avoid iterating stopped coroutines.
284
358
if getcoroutinestate (self ._coroutine ) is CORO_CLOSED :
285
359
raise StopAsyncIteration ()
286
- if self .ag_running :
287
- raise ValueError ("async generator already executing" )
288
- try :
289
- self .ag_running = True
290
- return await ANextIter (self ._it , start_fn , * args )
291
- finally :
292
- self .ag_running = False
360
+
361
+ async def step ():
362
+ if self .ag_running :
363
+ raise ValueError ("async generator already executing" )
364
+ try :
365
+ self .ag_running = True
366
+ return await ANextIter (self ._it , start_fn , * args )
367
+ except StopAsyncIteration :
368
+ self ._pypy_issue2786_workaround .discard (self ._coroutine )
369
+ raise
370
+ finally :
371
+ self .ag_running = False
372
+
373
+ return step ()
293
374
294
375
################################################################
295
376
# Cleanup
296
377
################################################################
297
378
298
379
async def aclose (self ):
299
380
state = getcoroutinestate (self ._coroutine )
381
+ if state is CORO_CLOSED or self ._closed :
382
+ return
383
+ # Make sure that even if we raise "async_generator ignored
384
+ # GeneratorExit", and thus fail to exhaust the coroutine,
385
+ # __del__ doesn't complain again.
386
+ self ._closed = True
300
387
if state is CORO_CREATED :
301
388
# Make sure that aclose() on an unstarted generator returns
302
389
# successfully and prevents future iteration.
303
390
self ._it .close ()
304
391
return
305
- elif state is CORO_CLOSED :
306
- return
307
392
try :
308
393
await self .athrow (GeneratorExit )
309
394
except (GeneratorExit , StopAsyncIteration ):
310
- pass
395
+ self . _pypy_issue2786_workaround . discard ( self . _coroutine )
311
396
else :
312
397
raise RuntimeError ("async_generator ignored GeneratorExit" )
313
398
314
399
def __del__ (self ):
400
+ self ._pypy_issue2786_workaround .discard (self ._coroutine )
315
401
if getcoroutinestate (self ._coroutine ) is CORO_CREATED :
316
402
# Never started, nothing to clean up, just suppress the "coroutine
317
403
# never awaited" message.
318
404
self ._coroutine .close ()
319
- if getcoroutinestate (self ._coroutine ) is CORO_SUSPENDED :
320
- # This exception will get swallowed because this is __del__, but
321
- # it's an easy way to trigger the print-to-console logic
322
- raise RuntimeError (
323
- "partially-exhausted async_generator {!r} garbage collected"
324
- .format (self ._coroutine .cr_frame .f_code .co_name )
325
- )
405
+ if getcoroutinestate (self ._coroutine
406
+ ) is CORO_SUSPENDED and not self ._closed :
407
+ if self ._finalizer is not None :
408
+ self ._finalizer (self )
409
+ else :
410
+ # Mimic the behavior of native generators on GC with no finalizer:
411
+ # throw in GeneratorExit, run for one turn, and complain if it didn't
412
+ # finish.
413
+ thrower = self .athrow (GeneratorExit )
414
+ try :
415
+ thrower .send (None )
416
+ except (GeneratorExit , StopAsyncIteration ):
417
+ pass
418
+ except StopIteration :
419
+ raise RuntimeError ("async_generator ignored GeneratorExit" )
420
+ else :
421
+ raise RuntimeError (
422
+ "async_generator {!r} awaited during finalization; install "
423
+ "a finalization hook to support this, or wrap it in "
424
+ "'async with aclosing(...):'"
425
+ .format (self .ag_code .co_name )
426
+ )
427
+ finally :
428
+ thrower .close ()
326
429
327
430
328
431
if hasattr (collections .abc , "AsyncGenerator" ):
0 commit comments