Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always raise a received result-as-error in spawn tasks #288

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,19 @@ async def spawn_and_error(breadth, depth) -> None:
)
kwargs = {
'name': f'{name}_errorer_{i}',
# 'delay': 1,
}
await nursery.run_in_actor(*args, **kwargs)


@tractor_test
async def test_nested_multierrors(loglevel, start_method):
"""Test that failed actor sets are wrapped in `trio.MultiError`s.
'''
Test that failed actor sets are wrapped in `trio.MultiError`s.
This test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
"""

'''
if start_method == 'trio':
depth = 3
subactor_breadth = 2
Expand All @@ -359,6 +362,7 @@ async def test_nested_multierrors(loglevel, start_method):
breadth=subactor_breadth,
depth=depth,
)

except trio.MultiError as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
Expand Down Expand Up @@ -394,16 +398,13 @@ async def test_nested_multierrors(loglevel, start_method):
assert isinstance(subexc, tractor.RemoteActorError)

if depth > 0 and subactor_breadth > 1:
# XXX not sure what's up with this..
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
if platform.system() == 'Windows':
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (trio.MultiError, tractor.RemoteActorError)
else:
assert isinstance(subexc, trio.MultiError)
# XXX it's race whether or not a parent containing
# a nursery *may* get multiple child failures before
# it cancels and tears down.
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (trio.MultiError, tractor.RemoteActorError)
else:
assert subexc.type is trio.MultiError
assert isinstance(subexc, trio.MultiError)
else:
assert subexc.type in (tractor.RemoteActorError, trio.Cancelled)

Expand Down Expand Up @@ -486,9 +487,11 @@ def test_cancel_while_childs_child_in_sync_sleep(
start_method,
spawn_backend,
):
"""Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
"""
Verify that a child cancelled while executing sync code is torn down
even when that cancellation is triggered by the parent
2 nurseries "up".

"""
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
Expand All @@ -500,7 +503,7 @@ async def main():
spawn,
name='spawn',
)
await trio.sleep(1)
await trio.sleep(0.5)
assert 0

with pytest.raises(AssertionError):
Expand Down
98 changes: 75 additions & 23 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from ._portal import Portal
from ._actor import Actor
from ._entry import _mp_main
from ._exceptions import ActorFailure
from ._exceptions import ActorFailure, NoResult


log = get_logger('tractor')
Expand Down Expand Up @@ -136,47 +136,74 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api
final = await portal.result()

except (Exception, trio.MultiError) as err:
except (
Exception,
trio.MultiError
) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err

except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
return err

else:
log.debug(f"Returning final result: {final}")
return final


async def pack_and_report_errors(
portal: Portal,
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],

) -> Any:

# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, subactor)

uid = portal.channel.uid
if (
isinstance(result, Exception)
# or isinstance(result, trio.MultiError)
):
errors[subactor.uid] = result
log.warning(f"{uid} received remote error:\n{result}")
raise result

elif isinstance(result, trio.Cancelled):
errors[subactor.uid] = result
log.runtime(f"{uid} was cancelled before result")

else:
log.runtime( f"{uid} received final result:\n{result}")

return result


async def cancel_on_completion(

portal: Portal,
actor: Actor,
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],

) -> None:
'''
Cancel actor gracefully once it's "main" portal's
Cancel subactor gracefully once it's "main" portal's
result arrives.

Should only be called for actors spawned with `run_in_actor()`.

'''
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)

else:
log.runtime(
f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}")
await pack_and_report_errors(
portal,
subactor,
errors,
)

# cancel the process now that we have a final result
await portal.cancel_actor()
Expand Down Expand Up @@ -344,8 +371,9 @@ async def new_proc(
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()

cancel_on_complete = portal in actor_nursery._cancel_after_result_on_exit
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
if cancel_on_complete:
nursery.start_soon(
cancel_on_completion,
portal,
Expand All @@ -369,6 +397,11 @@ async def new_proc(
f"{subactor.uid}")
nursery.cancel_scope.cancel()

# if errors:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the alternative approach mentioned in #287: cancelling the ria_nursery instead of raising any final error retrieved from the portal.

I think this approach is a bit more, hard to extend wrt supervisor strats being overloaded by a user.

# log.warning(
# f'Remote errors retreived from child: {subactor.uid}')
# actor_nursery._ria_nursery.cancel_scope.cancel()

finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
Expand Down Expand Up @@ -398,11 +431,30 @@ async def new_proc(
else:
log.warning('Nursery cancelled before sub-proc started')

uid = subactor.uid
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)

subactor, _, portal = actor_nursery._children.pop(uid)

# check for a late delivery of an error from
# the target remote task and overwrite any cancel
# that was captured as part of teardown.
if cancel_on_complete:
error = errors.get(uid)
if type(error) is trio.Cancelled:
# actor was cancelled before it's final result was
# retreived so check now for any result and pack as
# an error to be raised in the surrounding
# nursery's multierror handling.
errors.pop(uid)
with trio.move_on_after(0.001) as cs:
cs.shield = True
err = await pack_and_report_errors(
portal,
subactor,
errors,
)
if type(err) is trio.Cancelled:
errors.pop(uid)
else:
# `multiprocessing`
# async with trio.open_nursery() as nursery:
Expand Down