Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a474e04

Browse filesBrowse files
itamarojbower-fbwillingc
authored
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
1 parent 59bc36a commit a474e04
Copy full SHA for a474e04

File tree

Expand file treeCollapse file tree

12 files changed

+945
-47
lines changed
Filter options
Expand file treeCollapse file tree

12 files changed

+945
-47
lines changed

‎Doc/library/asyncio-task.rst

Copy file name to clipboardExpand all lines: Doc/library/asyncio-task.rst
+36Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,42 @@ Running Tasks Concurrently
527527
and there is no running event loop.
528528

529529

530+
Eager Task Factory
531+
==================
532+
533+
.. function:: eager_task_factory(loop, coro, *, name=None, context=None)
534+
535+
A task factory for eager task execution.
536+
537+
When using this factory (via :meth:`loop.set_task_factory(asyncio.eager_task_factory) <loop.set_task_factory>`),
538+
coroutines begin execution synchronously during :class:`Task` construction.
539+
Tasks are only scheduled on the event loop if they block.
540+
This can be a performance improvement as the overhead of loop scheduling
541+
is avoided for coroutines that complete synchronously.
542+
543+
A common example where this is beneficial is coroutines which employ
544+
caching or memoization to avoid actual I/O when possible.
545+
546+
.. note::
547+
548+
Immediate execution of the coroutine is a semantic change.
549+
If the coroutine returns or raises, the task is never scheduled
550+
to the event loop. If the coroutine execution blocks, the task is
551+
scheduled to the event loop. This change may introduce behavior
552+
changes to existing applications. For example,
553+
the application's task execution order is likely to change.
554+
555+
.. versionadded:: 3.12
556+
557+
.. function:: create_eager_task_factory(custom_task_constructor)
558+
559+
Create an eager task factory, similar to :func:`eager_task_factory`,
560+
using the provided *custom_task_constructor* when creating a new task instead
561+
of the default :class:`Task`.
562+
563+
.. versionadded:: 3.12
564+
565+
530566
Shielding From Cancellation
531567
===========================
532568

‎Doc/whatsnew/3.12.rst

Copy file name to clipboardExpand all lines: Doc/whatsnew/3.12.rst
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,11 @@ Optimizations
613613
* Speed up :class:`asyncio.Task` creation by deferring expensive string formatting.
614614
(Contributed by Itamar O in :gh:`103793`.)
615615

616+
* Added :func:`asyncio.eager_task_factory` and :func:`asyncio.create_eager_task_factory`
617+
functions to allow opting an event loop in to eager task execution,
618+
speeding up some use-cases by up to 50%.
619+
(Contributed by Jacob Bower & Itamar O in :gh:`102853`)
620+
616621

617622
CPython bytecode changes
618623
========================

‎Include/internal/pycore_global_objects_fini_generated.h

Copy file name to clipboardExpand all lines: Include/internal/pycore_global_objects_fini_generated.h
+2Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Include/internal/pycore_global_strings.h

Copy file name to clipboardExpand all lines: Include/internal/pycore_global_strings.h
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ struct _Py_global_strings {
370370
STRUCT_FOR_ID(dst_dir_fd)
371371
STRUCT_FOR_ID(duration)
372372
STRUCT_FOR_ID(e)
373+
STRUCT_FOR_ID(eager_start)
373374
STRUCT_FOR_ID(effective_ids)
374375
STRUCT_FOR_ID(element_factory)
375376
STRUCT_FOR_ID(encode)
@@ -460,6 +461,7 @@ struct _Py_global_strings {
460461
STRUCT_FOR_ID(instructions)
461462
STRUCT_FOR_ID(intern)
462463
STRUCT_FOR_ID(intersection)
464+
STRUCT_FOR_ID(is_running)
463465
STRUCT_FOR_ID(isatty)
464466
STRUCT_FOR_ID(isinstance)
465467
STRUCT_FOR_ID(isoformat)

‎Include/internal/pycore_runtime_init_generated.h

Copy file name to clipboardExpand all lines: Include/internal/pycore_runtime_init_generated.h
+2Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Include/internal/pycore_unicodeobject_generated.h

Copy file name to clipboardExpand all lines: Include/internal/pycore_unicodeobject_generated.h
+6Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Lib/asyncio/base_tasks.py

Copy file name to clipboardExpand all lines: Lib/asyncio/base_tasks.py
+6-4Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ def _task_repr_info(task):
1515

1616
info.insert(1, 'name=%r' % task.get_name())
1717

18-
coro = coroutines._format_coroutine(task._coro)
19-
info.insert(2, f'coro=<{coro}>')
20-
2118
if task._fut_waiter is not None:
22-
info.insert(3, f'wait_for={task._fut_waiter!r}')
19+
info.insert(2, f'wait_for={task._fut_waiter!r}')
20+
21+
if task._coro:
22+
coro = coroutines._format_coroutine(task._coro)
23+
info.insert(2, f'coro=<{coro}>')
24+
2325
return info
2426

2527

‎Lib/asyncio/tasks.py

Copy file name to clipboardExpand all lines: Lib/asyncio/tasks.py
+100-22Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
'wait', 'wait_for', 'as_completed', 'sleep',
77
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
88
'current_task', 'all_tasks',
9+
'create_eager_task_factory', 'eager_task_factory',
910
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
1011
)
1112

@@ -43,22 +44,26 @@ def all_tasks(loop=None):
4344
"""Return a set of all tasks for the loop."""
4445
if loop is None:
4546
loop = events.get_running_loop()
46-
# Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47-
# thread while we do so. Therefore we cast it to list prior to filtering. The list
48-
# cast itself requires iteration, so we repeat it several times ignoring
49-
# RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50-
# details.
47+
# capturing the set of eager tasks first, so if an eager task "graduates"
48+
# to a regular task in another thread, we don't risk missing it.
49+
eager_tasks = list(_eager_tasks)
50+
# Looping over the WeakSet isn't safe as it can be updated from another
51+
# thread, therefore we cast it to list prior to filtering. The list cast
52+
# itself requires iteration, so we repeat it several times ignoring
53+
# RuntimeErrors (which are not very likely to occur).
54+
# See issues 34970 and 36607 for details.
55+
scheduled_tasks = None
5156
i = 0
5257
while True:
5358
try:
54-
tasks = list(_all_tasks)
59+
scheduled_tasks = list(_scheduled_tasks)
5560
except RuntimeError:
5661
i += 1
5762
if i >= 1000:
5863
raise
5964
else:
6065
break
61-
return {t for t in tasks
66+
return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
6267
if futures._get_loop(t) is loop and not t.done()}
6368

6469

@@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
9398
# status is still pending
9499
_log_destroy_pending = True
95100

96-
def __init__(self, coro, *, loop=None, name=None, context=None):
101+
def __init__(self, coro, *, loop=None, name=None, context=None,
102+
eager_start=False):
97103
super().__init__(loop=loop)
98104
if self._source_traceback:
99105
del self._source_traceback[-1]
@@ -117,8 +123,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
117123
else:
118124
self._context = context
119125

120-
self._loop.call_soon(self.__step, context=self._context)
121-
_register_task(self)
126+
if eager_start and self._loop.is_running():
127+
self.__eager_start()
128+
else:
129+
self._loop.call_soon(self.__step, context=self._context)
130+
_register_task(self)
122131

123132
def __del__(self):
124133
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -250,6 +259,25 @@ def uncancel(self):
250259
self._num_cancels_requested -= 1
251260
return self._num_cancels_requested
252261

262+
def __eager_start(self):
263+
prev_task = _swap_current_task(self._loop, self)
264+
try:
265+
_register_eager_task(self)
266+
try:
267+
self._context.run(self.__step_run_and_handle_result, None)
268+
finally:
269+
_unregister_eager_task(self)
270+
finally:
271+
try:
272+
curtask = _swap_current_task(self._loop, prev_task)
273+
assert curtask is self
274+
finally:
275+
if self.done():
276+
self._coro = None
277+
self = None # Needed to break cycles when an exception occurs.
278+
else:
279+
_register_task(self)
280+
253281
def __step(self, exc=None):
254282
if self.done():
255283
raise exceptions.InvalidStateError(
@@ -258,11 +286,17 @@ def __step(self, exc=None):
258286
if not isinstance(exc, exceptions.CancelledError):
259287
exc = self._make_cancelled_error()
260288
self._must_cancel = False
261-
coro = self._coro
262289
self._fut_waiter = None
263290

264291
_enter_task(self._loop, self)
265-
# Call either coro.throw(exc) or coro.send(None).
292+
try:
293+
self.__step_run_and_handle_result(exc)
294+
finally:
295+
_leave_task(self._loop, self)
296+
self = None # Needed to break cycles when an exception occurs.
297+
298+
def __step_run_and_handle_result(self, exc):
299+
coro = self._coro
266300
try:
267301
if exc is None:
268302
# We use the `send` method directly, because coroutines
@@ -334,7 +368,6 @@ def __step(self, exc=None):
334368
self._loop.call_soon(
335369
self.__step, new_exc, context=self._context)
336370
finally:
337-
_leave_task(self._loop, self)
338371
self = None # Needed to break cycles when an exception occurs.
339372

340373
def __wakeup(self, future):
@@ -897,17 +930,41 @@ def callback():
897930
return future
898931

899932

900-
# WeakSet containing all alive tasks.
901-
_all_tasks = weakref.WeakSet()
933+
def create_eager_task_factory(custom_task_constructor):
934+
935+
if "eager_start" not in inspect.signature(custom_task_constructor).parameters:
936+
raise TypeError(
937+
"Provided constructor does not support eager task execution")
938+
939+
def factory(loop, coro, *, name=None, context=None):
940+
return custom_task_constructor(
941+
coro, loop=loop, name=name, context=context, eager_start=True)
942+
943+
944+
return factory
945+
946+
eager_task_factory = create_eager_task_factory(Task)
947+
948+
949+
# Collectively these two sets hold references to the complete set of active
950+
# tasks. Eagerly executed tasks use a faster regular set as an optimization
951+
# but may graduate to a WeakSet if the task blocks on IO.
952+
_scheduled_tasks = weakref.WeakSet()
953+
_eager_tasks = set()
902954

903955
# Dictionary containing tasks that are currently active in
904956
# all running event loops. {EventLoop: Task}
905957
_current_tasks = {}
906958

907959

908960
def _register_task(task):
909-
"""Register a new task in asyncio as executed by loop."""
910-
_all_tasks.add(task)
961+
"""Register an asyncio Task scheduled to run on an event loop."""
962+
_scheduled_tasks.add(task)
963+
964+
965+
def _register_eager_task(task):
966+
"""Register an asyncio Task about to be eagerly executed."""
967+
_eager_tasks.add(task)
911968

912969

913970
def _enter_task(loop, task):
@@ -926,28 +983,49 @@ def _leave_task(loop, task):
926983
del _current_tasks[loop]
927984

928985

986+
def _swap_current_task(loop, task):
987+
prev_task = _current_tasks.get(loop)
988+
if task is None:
989+
del _current_tasks[loop]
990+
else:
991+
_current_tasks[loop] = task
992+
return prev_task
993+
994+
929995
def _unregister_task(task):
930-
"""Unregister a task."""
931-
_all_tasks.discard(task)
996+
"""Unregister a completed, scheduled Task."""
997+
_scheduled_tasks.discard(task)
998+
999+
1000+
def _unregister_eager_task(task):
1001+
"""Unregister a task which finished its first eager step."""
1002+
_eager_tasks.discard(task)
9321003

9331004

9341005
_py_current_task = current_task
9351006
_py_register_task = _register_task
1007+
_py_register_eager_task = _register_eager_task
9361008
_py_unregister_task = _unregister_task
1009+
_py_unregister_eager_task = _unregister_eager_task
9371010
_py_enter_task = _enter_task
9381011
_py_leave_task = _leave_task
1012+
_py_swap_current_task = _swap_current_task
9391013

9401014

9411015
try:
942-
from _asyncio import (_register_task, _unregister_task,
943-
_enter_task, _leave_task,
944-
_all_tasks, _current_tasks,
1016+
from _asyncio import (_register_task, _register_eager_task,
1017+
_unregister_task, _unregister_eager_task,
1018+
_enter_task, _leave_task, _swap_current_task,
1019+
_scheduled_tasks, _eager_tasks, _current_tasks,
9451020
current_task)
9461021
except ImportError:
9471022
pass
9481023
else:
9491024
_c_current_task = current_task
9501025
_c_register_task = _register_task
1026+
_c_register_eager_task = _register_eager_task
9511027
_c_unregister_task = _unregister_task
1028+
_c_unregister_eager_task = _unregister_eager_task
9521029
_c_enter_task = _enter_task
9531030
_c_leave_task = _leave_task
1031+
_c_swap_current_task = _swap_current_task

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.