Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Add support for PEP 525-style garbage collection hooks #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions 4 async_generator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
yield_from_,
isasyncgen,
isasyncgenfunction,
get_asyncgen_hooks,
set_asyncgen_hooks,
)
from ._util import aclosing, asynccontextmanager

Expand All @@ -16,4 +18,6 @@
"isasyncgen",
"isasyncgenfunction",
"asynccontextmanager",
"get_asyncgen_hooks",
"set_asyncgen_hooks",
]
157 changes: 130 additions & 27 deletions 157 async_generator/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,76 @@ def _invoke(self, fn, *args):
return result


UNSPECIFIED = object()
try:
from sys import get_asyncgen_hooks, set_asyncgen_hooks

except ImportError:
import threading

asyncgen_hooks = collections.namedtuple(
"asyncgen_hooks", ("firstiter", "finalizer")
)

class _hooks_storage(threading.local):
def __init__(self):
self.firstiter = None
self.finalizer = None

_hooks = _hooks_storage()

def get_asyncgen_hooks():
return asyncgen_hooks(
firstiter=_hooks.firstiter, finalizer=_hooks.finalizer
)

def set_asyncgen_hooks(firstiter=UNSPECIFIED, finalizer=UNSPECIFIED):
if firstiter is not UNSPECIFIED:
if firstiter is None or callable(firstiter):
_hooks.firstiter = firstiter
else:
raise TypeError(
"callable firstiter expected, got {}".format(
type(firstiter).__name__
)
)

if finalizer is not UNSPECIFIED:
if finalizer is None or callable(finalizer):
_hooks.finalizer = finalizer
else:
raise TypeError(
"callable finalizer expected, got {}".format(
type(finalizer).__name__
)
)


class AsyncGenerator:
# https://bitbucket.org/pypy/pypy/issues/2786:
# PyPy implements 'await' in a way that requires the frame object
# used to execute a coroutine to keep a weakref to that coroutine.
# During a GC pass, weakrefs to all doomed objects are broken
# before any of the doomed objects' finalizers are invoked.
# If an AsyncGenerator is unreachable, its _coroutine probably
# is too, and the weakref from ag._coroutine.cr_frame to
# ag._coroutine will be broken before ag.__del__ can do its
# one-turn close attempt or can schedule a full aclose() using
# the registered finalization hook. It doesn't look like the
# underlying issue is likely to be fully fixed anytime soon,
# so we work around it by preventing an AsyncGenerator and
# its _coroutine from being considered newly unreachable at
# the same time if the AsyncGenerator's finalizer might want
# to iterate the coroutine some more.
_pypy_issue2786_workaround = set()

def __init__(self, coroutine):
self._coroutine = coroutine
self._it = coroutine.__await__()
self.ag_running = False
self._finalizer = None
self._closed = False
self._hooks_inited = False

# On python 3.5.0 and 3.5.1, __aiter__ must be awaitable.
# Starting in 3.5.2, it should not be awaitable, and if it is, then it
Expand Down Expand Up @@ -263,66 +328,104 @@ def ag_frame(self):
# Core functionality
################################################################

# We make these async functions and use await, rather than just regular
# functions that pass back awaitables, in order to get more useful
# tracebacks when debugging.
# These need to return awaitables, rather than being async functions,
# to match the native behavior where the firstiter hook is called
# immediately on asend()/etc, even if the coroutine that asend()
# produces isn't awaited for a bit.

def __anext__(self):
return self._do_it(self._it.__next__)

async def __anext__(self):
return await self._do_it(self._it.__next__)
def asend(self, value):
return self._do_it(self._it.send, value)

async def asend(self, value):
return await self._do_it(self._it.send, value)
def athrow(self, type, value=None, traceback=None):
return self._do_it(self._it.throw, type, value, traceback)

async def athrow(self, type, value=None, traceback=None):
return await self._do_it(self._it.throw, type, value, traceback)
def _do_it(self, start_fn, *args):
if not self._hooks_inited:
self._hooks_inited = True
(firstiter, self._finalizer) = get_asyncgen_hooks()
if firstiter is not None:
firstiter(self)
if sys.implementation.name == "pypy":
self._pypy_issue2786_workaround.add(self._coroutine)

async def _do_it(self, start_fn, *args):
# On CPython 3.5.2 (but not 3.5.0), coroutines get cranky if you try
# to iterate them after they're exhausted. Generators OTOH just raise
# StopIteration. We want to convert the one into the other, so we need
# to avoid iterating stopped coroutines.
if getcoroutinestate(self._coroutine) is CORO_CLOSED:
raise StopAsyncIteration()
if self.ag_running:
raise ValueError("async generator already executing")
try:
self.ag_running = True
return await ANextIter(self._it, start_fn, *args)
finally:
self.ag_running = False

async def step():
if self.ag_running:
raise ValueError("async generator already executing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of moving this check, and the self.ag_running = True, up out of the async def?

There would be one tricky thing to worry about: if a coroutine object gets closed or GCed before it even starts, then python doesn't enforce its normal guarantee that finally blocks will be executed. So if we did do this I guess we'd need to add another yield at the top of step, and immediately iterate it that far when we created the coroutine object, in order to guarantee that eventually ag_running = False would happen. Maybe this is more work than it's worth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the intent would be to guard against two asend() coroutines existing simultaneously, even if one is exhausted before the second is started? Given the implementation difficulties, I don't think it's worth it, since it seems impossible to actually interleave the underlying send() calls in a way that would create confusion. (I originally put the ag_running test outside of step() with the setting-to-True still inside, and that did create potential problems, which is why I added test_reentrance_forbidden_simultaneous_asends.)

try:
self.ag_running = True
return await ANextIter(self._it, start_fn, *args)
except StopAsyncIteration:
self._pypy_issue2786_workaround.discard(self._coroutine)
raise
finally:
self.ag_running = False

return step()

################################################################
# Cleanup
################################################################

async def aclose(self):
state = getcoroutinestate(self._coroutine)
if state is CORO_CLOSED or self._closed:
return
# Make sure that even if we raise "async_generator ignored
# GeneratorExit", and thus fail to exhaust the coroutine,
# __del__ doesn't complain again.
self._closed = True
if state is CORO_CREATED:
# Make sure that aclose() on an unstarted generator returns
# successfully and prevents future iteration.
self._it.close()
return
elif state is CORO_CLOSED:
return
try:
await self.athrow(GeneratorExit)
except (GeneratorExit, StopAsyncIteration):
pass
self._pypy_issue2786_workaround.discard(self._coroutine)
else:
raise RuntimeError("async_generator ignored GeneratorExit")

def __del__(self):
self._pypy_issue2786_workaround.discard(self._coroutine)
if getcoroutinestate(self._coroutine) is CORO_CREATED:
# Never started, nothing to clean up, just suppress the "coroutine
# never awaited" message.
self._coroutine.close()
if getcoroutinestate(self._coroutine) is CORO_SUSPENDED:
# This exception will get swallowed because this is __del__, but
# it's an easy way to trigger the print-to-console logic
raise RuntimeError(
"partially-exhausted async_generator {!r} garbage collected"
.format(self._coroutine.cr_frame.f_code.co_name)
)
if getcoroutinestate(self._coroutine
) is CORO_SUSPENDED and not self._closed:
if self._finalizer is not None:
self._finalizer(self)
else:
# Mimic the behavior of native generators on GC with no finalizer:
# throw in GeneratorExit, run for one turn, and complain if it didn't
# finish.
thrower = self.athrow(GeneratorExit)
try:
thrower.send(None)
except (GeneratorExit, StopAsyncIteration):
pass
except StopIteration:
raise RuntimeError("async_generator ignored GeneratorExit")
else:
raise RuntimeError(
"async_generator {!r} awaited during finalization; install "
"a finalization hook to support this, or wrap it in "
"'async with aclosing(...):'"
.format(self.ag_code.co_name)
)
finally:
thrower.close()


if hasattr(collections.abc, "AsyncGenerator"):
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.