Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use greenlets rather than threads for sync loop #137

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ towncrier
trio >= 0.15.0
outcome
attrs
greenlet
5 changes: 3 additions & 2 deletions docs/source/principles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ this gap by providing two event loop implementations.
installed a custom event loop policy, calling :func:`asyncio.new_event_loop`
(including the implicit call made by the first :func:`asyncio.get_event_loop`
in the main thread) will give you an event loop that transparently runs
in a separate thread in order to support multiple
in a separate greenlet in order to support multiple
calls to :meth:`~asyncio.loop.run_until_complete`,
:meth:`~asyncio.loop.run_forever`, and :meth:`~asyncio.loop.stop`.
Sync loops are intended to allow trio-asyncio to run the existing
test suites of large asyncio libraries, which often call
:meth:`~asyncio.loop.run_until_complete` on the same loop multiple times.
Using them for other purposes is deprecated.
Using them for other purposes is not recommended (it is better to refactor
so you can use an async loop) but will probably work.
37 changes: 13 additions & 24 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Asyncio main loop
+++++++++++++++++

Sometimes you instead start with asyncio code which you wish to extend
with some Trio portions. By far the best-supported approach here is to
with some Trio portions. The best-supported approach here is to
wrap your entire asyncio program in a Trio event loop. In other words,
you should transform this code::

Expand All @@ -123,27 +123,18 @@ to this::
trio_asyncio.run(trio_asyncio.aio_as_trio(async_main))

If your program makes multiple calls to ``run_until_complete()`` and/or
``run_forever()``, this may be a somewhat challenging transformation.
In theory, you can instead keep the old approach (``get_event_loop()`` +
``run_forever()``, or if the call to :func:`asyncio.run` is hidden inside
a library you're using, then this may be a somewhat challenging transformation.
In such cases, you can instead keep the old approach (``get_event_loop()`` +
``run_until_complete()``) unchanged, and if you've imported ``trio_asyncio``
(and not changed the asyncio event loop policy) you'll still be able to use
:func:`~trio_asyncio.trio_as_aio` to run Trio code from within your
asyncio-flavored functions. In practice, this is not recommended, because:

* It's implemented by running the contents of the loop in an
additional thread, so anything that expects to run on the main
thread (such as a signal handler) won't be happy.

* The implementation is kind of a terrible hack.

For these reasons, obtaining a new Trio-enabled asyncio event loop
using the standard asyncio functions (:func:`asyncio.get_event_loop`,
etc), rather than :func:`trio_asyncio.open_loop`, will raise a
deprecation warning. (Except when running under pytest, because
support for ``run_until_complete()`` is often needed to test asyncio
libraries' test suites against trio-asyncio.) asyncio is transitioning
towards the model of using a single top-level :func:`asyncio.run` call
anyway, so the effort you spend on conversion won't be wasted.
asyncio-flavored functions. This is referred to internally as a "sync loop"
(``SyncTripEventLoop``), as contrasted with the "async loop" that you use
when you start from an existing Trio run. The sync loop is implemented using
the ``greenlet`` library to switch out of a Trio run that has not yet completed,
so it is less well-supported than the approach where you start in Trio.
But as of trio-asyncio 0.14.0, we do think it should generally work.

Compatibility issues
++++++++++++++++++++
Expand All @@ -166,7 +157,7 @@ Interrupting the asyncio loop

A trio-asyncio event loop created with :func:`open_loop` does not support
``run_until_complete`` or ``run_forever``. If you need these features,
you might be able to get away with using a (deprecated) "sync loop" as
you might be able to get away with using a "sync loop" as
explained :ref:`above <asyncio-loop>`, but it's better to refactor
your program so all of its async code runs within a single event loop
invocation. For example, you might replace::
Expand All @@ -180,7 +171,7 @@ invocation. For example, you might replace::
loop = asyncio.get_event_loop()
loop.run_until_complete(setup)
loop.run_forever()

with::

stopped_event = trio.Event()
Expand All @@ -202,9 +193,7 @@ Detecting the current function's flavor

:func:`sniffio.current_async_library` correctly reports "asyncio" or
"trio" when called from a trio-asyncio program, based on the flavor of
function that's calling it. (Some corner cases
might not work on Pythons below 3.7 where asyncio doesn't support
context variables.)
function that's calling it.

However, this feature should generally not be necessary, because you
should know whether each function in your program is asyncio-flavored
Expand Down
10 changes: 10 additions & 0 deletions newsfragments/137.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
trio-asyncio now implements its :ref:`synchronous event loop <asyncio-loop>`
(which is used when the top-level of your program is an asyncio call such as
:func:`asyncio.run`, rather than a Trio call such as :func:`trio.run`)
using the ``greenlet`` library rather than a separate thread. This provides
some better theoretical grounding and fixes various edge cases around signal
handling and other integrations; in particular, recent versions of IPython
will no longer crash when importing trio-asyncio. Synchronous event loops have
been un-deprecated with this change, though we still recommend using an
async loop (``async with trio_asyncio.open_loop():`` from inside a Trio run)
where possible.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"outcome",
"sniffio >= 1.3.0",
"exceptiongroup >= 1.0.0; python_version < '3.11'",
"greenlet",
],
# This means, just install *everything* you see under trio/, even if it
# doesn't look like a source file, so long as it appears in MANIFEST.in:
Expand Down
20 changes: 0 additions & 20 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ def xfail(rel_id):
def skip(rel_id):
mark(pytest.mark.skip, rel_id)

# This hangs, probably due to the thread shenanigans (it works
# fine with a greenlet-based sync loop)
skip("test_base_events.py::RunningLoopTests::test_running_loop_within_a_loop")

# Remainder of these have unclear issues
if sys.version_info < (3, 8):
xfail(
Expand Down Expand Up @@ -201,23 +197,7 @@ def skip(rel_id):
may_be_absent=True,
)

if sys.version_info >= (3, 9):
# This tries to create a new loop from within an existing one,
# which we don't support.
xfail("test_locks.py::ConditionTests::test_ambiguous_loops")

if sys.version_info >= (3, 12):
# This test sets signal handlers from within a coroutine,
# which doesn't work for us because SyncTrioEventLoop runs on
# a non-main thread.
xfail("test_unix_events.py::TestFork::test_fork_signal_handling")

# This test explicitly uses asyncio.tasks._c_current_task,
# bypassing our monkeypatch.
xfail(
"test_tasks.py::CCurrentLoopTests::test_current_task_with_implicit_loop"
)

# These tests assume asyncio.sleep(0) is sufficient to run all pending tasks
xfail(
"test_futures2.py::PyFutureTests::test_task_exc_handler_correct_context"
Expand Down
85 changes: 44 additions & 41 deletions trio_asyncio/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,41 +492,57 @@ def add_reader(self, fd, callback, *args):
self._ensure_fd_no_transport(fd)
return self._add_reader(fd, callback, *args)

def _add_reader(self, fd, callback, *args):
# Local helper to factor out common logic between _add_reader/_add_writer
def _add_io_handler(self, set_handle, wait_ready, fd, callback, args):
self._check_closed()
handle = ScopedHandle(callback, args, self)
reader = self._set_read_handle(fd, handle)
if reader is not None:
reader.cancel()
if self._token is None:
return
self._nursery.start_soon(self._reader_loop, fd, handle)
old_handle = set_handle(fd, handle)

def _set_read_handle(self, fd, handle):
try:
key = self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, EVENT_READ, (handle, None))
if old_handle is not None:
old_handle.cancel()
if self._token is None:
return None
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | EVENT_READ, (handle, writer))
return reader
self._nursery.start_soon(self._io_task, fd, handle, wait_ready)
return handle

async def _reader_loop(self, fd, handle):
async def _io_task(self, fd, handle, wait_ready):
with handle._scope:
try:
while True:
if handle._cancelled:
break
await _wait_readable(fd)
try:
await wait_ready(fd)
except OSError:
# maybe someone did
# h = add_reader(sock); h.cancel(); sock.close()
# without yielding to the event loop
if handle._cancelled:
break
raise
if handle._cancelled:
break
handle._run()
await self.synchronize()
except Exception as exc:
handle._raise(exc)

def _add_reader(self, fd, callback, *args):
return self._add_io_handler(
self._set_read_handle, _wait_readable, fd, callback, args
)

def _set_read_handle(self, fd, handle):
try:
key = self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, EVENT_READ, (handle, None))
return None
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | EVENT_READ, (handle, writer))
return reader

# writing to a file descriptor

def add_writer(self, fd, callback, *args):
Expand All @@ -546,15 +562,10 @@ def add_writer(self, fd, callback, *args):

# remove_writer: unchanged from asyncio

def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = ScopedHandle(callback, args, self)
writer = self._set_write_handle(fd, handle)
if writer is not None:
writer.cancel()
if self._token is None:
return
self._nursery.start_soon(self._writer_loop, fd, handle)
def _add_writer(self, fd, callback, *args, _defer_start=False):
return self._add_io_handler(
self._set_write_handle, _wait_writable, fd, callback, args
)

def _set_write_handle(self, fd, handle):
try:
Expand All @@ -566,20 +577,6 @@ def _set_write_handle(self, fd, handle):
self._selector.modify(fd, mask | EVENT_WRITE, (reader, handle))
return writer

async def _writer_loop(self, fd, handle):
with handle._scope:
try:
while True:
if handle._cancelled:
break
await _wait_writable(fd)
if handle._cancelled:
break
handle._run()
await self.synchronize()
except Exception as exc:
handle._raise(exc)

def autoclose(self, fd):
"""
Mark a file descriptor so that it's auto-closed along with this loop.
Expand Down Expand Up @@ -752,6 +749,7 @@ async def _main_loop_exit(self):
# clean core fields
self._nursery = None
self._task = None
self._token = None

def is_running(self):
if self._stopped is None:
Expand All @@ -778,6 +776,11 @@ async def wait_stopped(self):
"""
await self._stopped.wait()

def _trio_io_cancel(self, cancel_scope):
"""Called when a ScopedHandle representing an I/O reader or writer
has its cancel() method called."""
cancel_scope.cancel()

def stop(self):
"""Halt the main loop.
Expand Down
2 changes: 1 addition & 1 deletion trio_asyncio/_handles.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, *args, **kw):

def cancel(self):
super().cancel()
self._scope.cancel()
self._loop._trio_io_cancel(self._scope)

def _repr_info(self):
return super()._repr_info() + ["scope={!r}".format(self._scope)]
Expand Down
22 changes: 12 additions & 10 deletions trio_asyncio/_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from ._async import TrioEventLoop
from ._util import run_aio_future
from ._deprecate import warn_deprecated

try:
from trio.lowlevel import wait_for_child
Expand Down Expand Up @@ -106,26 +105,29 @@ def _in_trio_context():
return True


_sync_loop_task_name = "trio_asyncio sync loop task"


def _in_trio_context_other_than_sync_loop():
try:
return trio.lowlevel.current_task().name != _sync_loop_task_name
except RuntimeError:
return False


class _TrioPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
@staticmethod
def _loop_factory():
raise RuntimeError("Event loop creations shouldn't get here")

def new_event_loop(self):
if _in_trio_context():
if _in_trio_context_other_than_sync_loop():
raise RuntimeError(
"You're within a Trio environment.\n"
"Use 'async with open_loop()' instead."
)
if _faked_policy.policy is not None:
return _faked_policy.policy.new_event_loop()
if "pytest" not in sys.modules:
warn_deprecated(
"Using trio-asyncio outside of a Trio event loop",
"0.10.0",
issue=None,
instead=None,
)

from ._sync import SyncTrioEventLoop

Expand Down Expand Up @@ -220,7 +222,7 @@ def _new_policy_get():
def _new_policy_set(new_policy):
if isinstance(new_policy, TrioPolicy):
raise RuntimeError("You can't set the Trio loop policy manually")
if _in_trio_context():
if _in_trio_context_other_than_sync_loop():
raise RuntimeError("You can't change the event loop policy in Trio context")
if new_policy is not None and not isinstance(
new_policy, asyncio.AbstractEventLoopPolicy
Expand Down
Loading
Loading