Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

bpo-39529: Deprecate creating new event loop in asyncio.get_event_loop() #23554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion 11 Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,16 @@ def get_event_loop():
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
return _py__get_event_loop()


def _get_event_loop(stacklevel=3):
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
import warnings
warnings.warn("There is no running event loop set",
DeprecationWarning, stacklevel=stacklevel)
return get_event_loop_policy().get_event_loop()


Expand Down Expand Up @@ -791,14 +798,15 @@ def set_child_watcher(watcher):
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
_py__get_event_loop = _get_event_loop


try:
# get_event_loop() is one of the most frequently called
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
get_running_loop, get_event_loop)
get_running_loop, get_event_loop, _get_event_loop)
except ImportError:
pass
else:
Expand All @@ -807,3 +815,4 @@ def set_child_watcher(watcher):
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
_c__get_event_loop = _get_event_loop
6 changes: 3 additions & 3 deletions 6 Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, *, loop=None):
the default event loop.
"""
if loop is None:
self._loop = events.get_event_loop()
self._loop = events._get_event_loop()
else:
self._loop = loop
self._callbacks = []
Expand Down Expand Up @@ -371,7 +371,7 @@ def _chain_future(source, destination):
raise TypeError('A future is required for source argument')
if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
raise TypeError('A future is required for destination argument %r' % destination)
source_loop = _get_loop(source) if isfuture(source) else None
dest_loop = _get_loop(destination) if isfuture(destination) else None

Expand Down Expand Up @@ -408,7 +408,7 @@ def wrap_future(future, *, loop=None):
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
loop = events.get_event_loop()
loop = events._get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
Expand Down
12 changes: 6 additions & 6 deletions 12 Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def open_connection(host=None, port=None, *,
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
"""
loop = events.get_running_loop()
loop = events._get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
Expand Down Expand Up @@ -73,7 +73,7 @@ async def start_server(client_connected_cb, host=None, port=None, *,
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
loop = events.get_running_loop()
loop = events._get_event_loop()

def factory():
reader = StreamReader(limit=limit, loop=loop)
Expand All @@ -90,7 +90,7 @@ def factory():
async def open_unix_connection(path=None, *,
limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
loop = events.get_running_loop()
loop = events._get_event_loop()

reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
Expand All @@ -102,7 +102,7 @@ async def open_unix_connection(path=None, *,
async def start_unix_server(client_connected_cb, path=None, *,
limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
loop = events.get_running_loop()
loop = events._get_event_loop()

def factory():
reader = StreamReader(limit=limit, loop=loop)
Expand All @@ -125,7 +125,7 @@ class FlowControlMixin(protocols.Protocol):

def __init__(self, loop=None):
if loop is None:
self._loop = events.get_event_loop()
self._loop = events._get_event_loop(stacklevel=4)
else:
self._loop = loop
self._paused = False
Expand Down Expand Up @@ -381,7 +381,7 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):

self._limit = limit
if loop is None:
self._loop = events.get_event_loop()
self._loop = events._get_event_loop()
else:
self._loop = loop
self._buffer = bytearray()
Expand Down
4 changes: 2 additions & 2 deletions 4 Lib/asyncio/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def communicate(self, input=None):

async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
limit=streams._DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
loop = events._get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
Expand All @@ -212,7 +212,7 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, limit=streams._DEFAULT_LIMIT,
**kwds):
loop = events.get_running_loop()
loop = events._get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
Expand Down
49 changes: 27 additions & 22 deletions 49 Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _all_tasks_compat(loop=None):
# the completed ones. Used to implement deprecated "Tasks.all_task()"
# method.
if loop is None:
loop = events.get_event_loop()
loop = events._get_event_loop()
# Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
# thread while we do so. Therefore we cast it to list prior to filtering. The list
# cast itself requires iteration, so we repeat it several times ignoring
Expand Down Expand Up @@ -570,7 +570,7 @@ def as_completed(fs, *, timeout=None):
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()

loop = events.get_event_loop()
loop = events._get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None

Expand Down Expand Up @@ -637,23 +637,28 @@ def ensure_future(coro_or_future, *, loop=None):

If the argument is a Future, it is returned directly.
"""
if coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was removed because this code did not work as intented long time ago (if worked at all).

return task
elif futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
elif inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
'required')
return _ensure_future(coro_or_future, loop=loop)


def _ensure_future(coro_or_future, *, loop=None):
while True:
serhiy-storchaka marked this conversation as resolved.
Show resolved Hide resolved
if coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events._get_event_loop(stacklevel=4)
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
elif inspect.isawaitable(coro_or_future):
coro_or_future = _wrap_awaitable(coro_or_future)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
'required')


@types.coroutine
Expand Down Expand Up @@ -727,7 +732,7 @@ def gather(*coros_or_futures, return_exceptions=False):
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
loop = events.get_event_loop()
loop = events._get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
Expand Down Expand Up @@ -794,7 +799,7 @@ def _done_callback(fut):
loop = None
for arg in coros_or_futures:
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
fut = _ensure_future(arg, loop=loop)
if loop is None:
loop = futures._get_loop(fut)
if fut is not arg:
Expand Down Expand Up @@ -844,7 +849,7 @@ def shield(arg):
except CancelledError:
res = None
"""
inner = ensure_future(arg)
inner = _ensure_future(arg)
if inner.done():
# Shortcut.
return inner
Expand Down
16 changes: 12 additions & 4 deletions 16 Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2699,10 +2699,14 @@ def get_event_loop(self):
loop = asyncio.new_event_loop()

with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertWarns(DeprecationWarning) as cm:
asyncio.get_event_loop()
self.assertEqual(cm.warnings[0].filename, __file__)
asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertWarns(DeprecationWarning) as cm:
asyncio.get_event_loop()
self.assertEqual(cm.warnings[0].filename, __file__)

with self.assertRaisesRegex(RuntimeError, 'no running'):
self.assertIs(asyncio.get_running_loop(), None)
Expand All @@ -2717,11 +2721,15 @@ async def func():

asyncio.set_event_loop(loop)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertWarns(DeprecationWarning) as cm:
asyncio.get_event_loop()
self.assertEqual(cm.warnings[0].filename, __file__)

asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()
with self.assertWarns(DeprecationWarning) as cm:
asyncio.get_event_loop()
self.assertEqual(cm.warnings[0].filename, __file__)

finally:
asyncio.set_event_loop_policy(old_policy)
Expand Down
9 changes: 8 additions & 1 deletion 9 Lib/test/test_asyncio/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ def test_ensure_future(self):
class BaseFutureTests:

def _new_future(self, *args, **kwargs):
return self.cls(*args, **kwargs)
if kwargs.get('loop') is None:
with self.assertWarns(DeprecationWarning) as cm:
fut = self.cls(*args, **kwargs)
self.assertEqual(cm.warnings[0].filename, __file__)
return fut
else:
return self.cls(*args, **kwargs)

def setUp(self):
super().setUp()
Expand Down Expand Up @@ -475,6 +481,7 @@ def test_wrap_future_future(self):
def test_wrap_future_use_global_loop(self):
with mock.patch('asyncio.futures.events') as events:
events.get_event_loop = lambda: self.loop
events._get_event_loop = lambda: self.loop
def run(arg):
return (arg, threading.get_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
Expand Down
23 changes: 13 additions & 10 deletions 23 Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,12 @@ async def create_queue():
queue._get_loop()
return queue

q = self.loop.run_until_complete(create_queue())
async def test():
q = await create_queue()
await asyncio.gather(producer(q, producer_num_items),
consumer(q, producer_num_items))

self.loop.run_until_complete(
asyncio.gather(producer(q, producer_num_items),
consumer(q, producer_num_items)),
)
self.loop.run_until_complete(test())

def test_cancelled_getters_not_being_held_in_self_getters(self):
def a_generator():
Expand Down Expand Up @@ -516,11 +516,14 @@ async def getter():
for _ in range(num):
item = queue.get_nowait()

t0 = putter(0)
t1 = putter(1)
t2 = putter(2)
t3 = putter(3)
self.loop.run_until_complete(asyncio.gather(getter(), t0, t1, t2, t3))
async def test():
t0 = putter(0)
t1 = putter(1)
t2 = putter(2)
t3 = putter(3)
await asyncio.gather(getter(), t0, t1, t2, t3)

self.loop.run_until_complete(test())

def test_cancelled_puts_not_being_held_in_self_putters(self):
def a_generator():
Expand Down
10 changes: 7 additions & 3 deletions 10 Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def tearDown(self):
@mock.patch('asyncio.streams.events')
def test_ctor_global_loop(self, m_events):
stream = asyncio.StreamReader()
self.assertIs(stream._loop, m_events.get_event_loop.return_value)
self.assertIs(stream._loop, m_events._get_event_loop.return_value)

def _basetest_open_connection(self, open_connection_fut):
messages = []
Expand Down Expand Up @@ -755,7 +755,9 @@ def test_streamreader_constructor(self):

# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
reader = asyncio.StreamReader()
with self.assertWarns(DeprecationWarning) as cm:
reader = asyncio.StreamReader()
self.assertEqual(cm.warnings[0].filename, __file__)
self.assertIs(reader._loop, self.loop)

def test_streamreaderprotocol_constructor(self):
Expand All @@ -765,7 +767,9 @@ def test_streamreaderprotocol_constructor(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
reader = mock.Mock()
protocol = asyncio.StreamReaderProtocol(reader)
with self.assertWarns(DeprecationWarning) as cm:
protocol = asyncio.StreamReaderProtocol(reader)
self.assertEqual(cm.warnings[0].filename, __file__)
self.assertIs(protocol._loop, self.loop)

def test_drain_raises(self):
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.