Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory object streams deadlock on asyncio #771

Closed
2 tasks done
gschaffner opened this issue Aug 28, 2024 · 1 comment · Fixed by #772
Closed
2 tasks done

Memory object streams deadlock on asyncio #771

gschaffner opened this issue Aug 28, 2024 · 1 comment · Fixed by #772
Labels
bug Something isn't working

Comments

@gschaffner
Copy link
Collaborator

gschaffner commented Aug 28, 2024

Things to check first

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

AnyIO version

master (439951d)

Python version

3.12.5 (CPython)

What happened?

Some applications started deadlocking recently when updating AnyIO to 4.4.0. It bisects to #735.

(Aside: Apologies I did not follow-up in the discussions about #728 in May. I got unexpectedly busy at the time :/ )

How can we reproduce the bug?

(This reproducer could be made smaller but I think it would sacrifice readability)

from __future__ import annotations

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from types import TracebackType
from typing import Final
from typing import TypeVar
from typing import cast

from typing_extensions import Self

import anyio
import anyio.lowlevel
from anyio import CancelScope
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectSendStream

E = TypeVar("E", bound=BaseException)


class A:
    # `A`'s API is pretty similar to `ObjectSendStream[str]`, except it requires
    # `__aexit__` (it doesn't support `aclose`).

    def __init__(self) -> None:
        super().__init__()
        self._acm: Final = self._acm_impl()

    async def __aenter__(self) -> Self:
        # TODO: Mypy bug?
        return await self._acm.__aenter__()  # type: ignore[return-value]

    async def __aexit__(
        self,
        exc_type: type[E] | None,
        exc_value: E | None,
        traceback: TracebackType | None,
    ) -> bool | None:
        return await self._acm.__aexit__(exc_type, exc_value, traceback)

    @asynccontextmanager
    async def _acm_impl(self) -> AsyncGenerator[Self]:
        async with anyio.create_task_group() as task_group:
            outgoing_service_cancel_scope, self._outgoing_send_stream = cast(
                tuple[CancelScope, MemoryObjectSendStream[str]],
                await task_group.start(self._serve_outgoing),
            )
            with self._outgoing_send_stream:
                try:
                    yield self
                finally:
                    # The `async with` body has exited, so if there is a pending
                    # cancellation request it can be allowed to reach the service task
                    # now.
                    outgoing_service_cancel_scope.shield = False
                    # If the `async with A()` isn't already in a cancelled scope, we also
                    # need to request the service task to shut down.
                    task_group.cancel_scope.cancel()

    async def _serve_outgoing(
        self,
        *,
        task_status: TaskStatus[tuple[CancelScope, MemoryObjectSendStream[str]]],
    ) -> None:
        (
            outgoing_send_stream,
            outgoing_receive_stream,
        ) = anyio.create_memory_object_stream[str](0)
        # The cancel scope here is a service task group pattern
        # (https://github.com/python-trio/trio/issues/1521). This service task must not
        # receive cancellation until the body of the `async with A()` block has finished
        # exiting first, because this service task must remain available in any
        # `finally`/etc. blocks of the `async with A()` body.
        with outgoing_receive_stream, CancelScope(shield=True) as cancel_scope:
            task_status.started((cancel_scope, outgoing_send_stream))
            async for msg in outgoing_receive_stream:
                print(f"outgoing stream driver handling outgoing message: {msg!r}")
                await anyio.lowlevel.checkpoint()

    async def send(self, msg: str, /) -> None:
        # Note: If `_serve_outgoing` crashes due to external causes, this will raise
        # `BrokenResourceError`.
        await self._outgoing_send_stream.send(msg)


async def main() -> None:
    with CancelScope() as cancel_scope:
        async with A() as a:
            try:
                await a.send("message during normal operation!")
                ...
                # Suppose that at some point there's a cancellation request from above
                # (e.g. a signal handler requesting process shutdown):
                cancel_scope.cancel()
            finally:
                # A bit of time is needed here for the scheduling order that triggers
                # the deadlock to be hit. (In the real code, there are more awaits here
                # and the deadlock is hit (empirically) every time.)
                n = 0
                while not a._outgoing_send_stream._state.waiting_receivers:
                    n += 1
                    await anyio.lowlevel.cancel_shielded_checkpoint()
                print(f"{n} scheduling rounds")

                with CancelScope(shield=True):
                    await a.send("message during cleanup!")


# Works on Trio; deadlocks on asyncio.
anyio.run(main, backend="asyncio")
@gschaffner gschaffner added the bug Something isn't working label Aug 28, 2024
@gschaffner
Copy link
Collaborator Author

The underlying bug is that TaskInfo.has_pending_cancellation is returning false positives with shields on asyncio. The false positive causes MemoryObjectSendStream.send to think that the receiver has a pending cancellation (even though the receiver is shielded), so it ignores the receiver:

while self._state.waiting_receivers:
receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
if not receiver.task_info.has_pending_cancellation():
receiver.item = item
receive_event.set()
return

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants
@gschaffner and others