Skip to content

Commit

Permalink
move finally into utility function
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Oct 5, 2024
1 parent 5c5c057 commit c9e0b73
Showing 1 changed file with 92 additions and 78 deletions.
170 changes: 92 additions & 78 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,102 +66,116 @@ async def __aenter__(self):
return self

async def __aexit__(self, et, exc, tb):
tb = None
try:
self._exiting = True
return await self._aexit(et, exc)
finally:
# Exceptions are heavy objects that can have object
# cycles (bad for GC); let's not keep a reference to
# a bunch of them. It would be nicer to use a try/finally
# in __aexit__ directly but that introduced some diff noise
self._parent_task = None
self._errors = None
self._base_error = None
exc = None

if (exc is not None and
self._is_base_error(exc) and
self._base_error is None):
self._base_error = exc
async def _aexit(self, et, exc):
self._exiting = True

if et is not None and issubclass(et, exceptions.CancelledError):
propagate_cancellation_error = exc
else:
propagate_cancellation_error = None
if (exc is not None and
self._is_base_error(exc) and
self._base_error is None):
self._base_error = exc

if et is not None:
if et is not None and issubclass(et, exceptions.CancelledError):
propagate_cancellation_error = exc
else:
propagate_cancellation_error = None

if et is not None:
if not self._aborting:
# Our parent task is being cancelled:
#
# async with TaskGroup() as g:
# g.create_task(...)
# await ... # <- CancelledError
#
# or there's an exception in "async with":
#
# async with TaskGroup() as g:
# g.create_task(...)
# 1 / 0
#
self._abort()

# We use while-loop here because "self._on_completed_fut"
# can be cancelled multiple times if our parent task
# is being cancelled repeatedly (or even once, when
# our own cancellation is already in progress)
while self._tasks:
if self._on_completed_fut is None:
self._on_completed_fut = self._loop.create_future()

try:
await self._on_completed_fut
except exceptions.CancelledError as ex:
if not self._aborting:
# Our parent task is being cancelled:
#
# async with TaskGroup() as g:
# g.create_task(...)
# await ... # <- CancelledError
#
# or there's an exception in "async with":
#
# async with TaskGroup() as g:
# g.create_task(...)
# 1 / 0
# async def wrapper():
# async with TaskGroup() as g:
# g.create_task(foo)
#
# "wrapper" is being cancelled while "foo" is
# still running.
propagate_cancellation_error = ex
self._abort()

# We use while-loop here because "self._on_completed_fut"
# can be cancelled multiple times if our parent task
# is being cancelled repeatedly (or even once, when
# our own cancellation is already in progress)
while self._tasks:
if self._on_completed_fut is None:
self._on_completed_fut = self._loop.create_future()
self._on_completed_fut = None

try:
await self._on_completed_fut
except exceptions.CancelledError as ex:
if not self._aborting:
# Our parent task is being cancelled:
#
# async def wrapper():
# async with TaskGroup() as g:
# g.create_task(foo)
#
# "wrapper" is being cancelled while "foo" is
# still running.
propagate_cancellation_error = ex
self._abort()

self._on_completed_fut = None

assert not self._tasks

if self._base_error is not None:
raise self._base_error
assert not self._tasks

if self._parent_cancel_requested:
# If this flag is set we *must* call uncancel().
if self._parent_task.uncancel() == 0:
# If there are no pending cancellations left,
# don't propagate CancelledError.
propagate_cancellation_error = None
if self._base_error is not None:
try:
raise self._base_error
finally:
exc = None

if self._parent_cancel_requested:
# If this flag is set we *must* call uncancel().
if self._parent_task.uncancel() == 0:
# If there are no pending cancellations left,
# don't propagate CancelledError.
propagate_cancellation_error = None

# Propagate CancelledError if there is one, except if there
# are other errors -- those have priority.
# Propagate CancelledError if there is one, except if there
# are other errors -- those have priority.
try:
if propagate_cancellation_error is not None and not self._errors:
raise propagate_cancellation_error

if et is not None and not issubclass(et, exceptions.CancelledError):
self._errors.append(exc)

if self._errors:
# If the parent task is being cancelled from the outside
# of the taskgroup, un-cancel and re-cancel the parent task,
# which will keep the cancel count stable.
if self._parent_task.cancelling():
self._parent_task.uncancel()
self._parent_task.cancel()
try:
raise propagate_cancellation_error
finally:
exc = None
finally:
propagate_cancellation_error = None

if et is not None and not issubclass(et, exceptions.CancelledError):
self._errors.append(exc)

if self._errors:
# If the parent task is being cancelled from the outside
# of the taskgroup, un-cancel and re-cancel the parent task,
# which will keep the cancel count stable.
if self._parent_task.cancelling():
self._parent_task.uncancel()
self._parent_task.cancel()
try:
raise BaseExceptionGroup(
'unhandled errors in a TaskGroup',
self._errors,
) from None
finally:
# Exceptions are heavy objects that can have object
# cycles (bad for GC); let's not keep a reference to
# a bunch of them.
propagate_cancellation_error = None
self._parent_task = None
self._errors = None
self._base_error = None
et = None
exc = None
tb = None
finally:
exc = None


def create_task(self, coro, *, name=None, context=None):
Expand Down

0 comments on commit c9e0b73

Please sign in to comment.