Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macos in ci #342

Open
wants to merge 20 commits into
base: shm_apis
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
901353c
Initial module import from `piker.data._sharemem`
goodboy Oct 15, 2022
0e4b37d
Add `ShmList` wrapping the stdlib's `ShareableList`
goodboy Oct 16, 2022
c01c227
Add initial readers-writer shm list tests
goodboy Oct 16, 2022
971ac50
Add repetitive attach to existing segment test
goodboy Oct 17, 2022
ecea1e1
Don't require runtime (for now), type annot fixing
goodboy Oct 17, 2022
6d4d428
Fix uid2nursery lookup table type annot
goodboy Oct 17, 2022
60f8f11
Rename token type to `NDToken` in the style of `nptyping`
goodboy Oct 17, 2022
2ac19b2
Add `ShmList` slice support in `.__getitem__()`
goodboy Oct 18, 2022
3bdd04e
Parametrize rw test with variable frame sizes
goodboy Oct 18, 2022
1be3f41
Mod define `_USE_POSIX`, add a of of todos
goodboy Oct 18, 2022
54322f2
Allocate size-specced "empty" sequence from default values by type
goodboy Oct 19, 2022
6453195
Pass `str` dtype for `use_str` case
goodboy Oct 20, 2022
9c336ec
Add `numpy` for testing optional integrated shm API layer
goodboy Oct 26, 2022
7c42d25
Add back `pytest` full trace flag to debug CI hangs
goodboy Oct 11, 2022
a5e3cf4
Add macos run using only the `trio` spawner
goodboy Oct 26, 2022
64819b2
Skip debugger tests on OS X for now
goodboy Oct 26, 2022
e12def5
Always print any std streams to console in docs examples tests
goodboy Nov 6, 2022
b624ebb
Add logging and teardown prints to full-fledged-streaming example
goodboy Nov 6, 2022
45a9aaf
Always set the `parent_exit: trio.Event` on exit
goodboy Nov 10, 2022
a89799b
Handle broken mem chan on `Actor._push_result()`
goodboy Nov 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,39 @@ jobs:
]

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python }}'

- name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager

- name: List dependencies
run: pip list

- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx


testing-macos:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 10
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
os: [macos-latest]
python: ['3.10']
spawn_backend: [
'trio',
]

steps:
- name: Checkout
uses: actions/checkout@v2

Expand All @@ -85,6 +117,7 @@ jobs:
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx


# We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very
# welcome, but our primary user base is not using that OS.
Expand Down
12 changes: 9 additions & 3 deletions examples/full_fledged_streaming_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ async def stream_data(seed):

# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
'''
Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""

'''
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
Expand Down Expand Up @@ -69,7 +71,8 @@ async def push_to_chan(portal, send_chan):
async def main():
# a nursery which spawns "actors"
async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616)
arbiter_addr=('127.0.0.1', 1616),
loglevel='cancel',
) as nursery:

seed = int(1e3)
Expand All @@ -92,6 +95,9 @@ async def main():
async for value in stream:
result_stream.append(value)

print("ROOT STREAM CONSUMER COMPLETE")

print("ROOT CANCELLING AGGREGATOR CHILD")
await portal.cancel_actor()

print(f"STREAM TIME = {time.time() - start}")
Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mypy
trio_typing
pexpect
towncrier
numpy
9 changes: 5 additions & 4 deletions tests/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
# - recurrent root errors


if platform.system() == 'Windows':
if osname := platform.system() in (
'Windows',
'Darwin',
):
pytest.skip(
'Debugger tests have no windows support (yet)',
'Debugger tests have no {osname} support (yet)',
allow_module_level=True,
)

Expand Down Expand Up @@ -783,8 +786,6 @@ def test_multi_nested_subactors_error_through_nurseries(

child = spawn('multi_nested_subactors_error_up_through_nurseries')

timed_out_early: bool = False

for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(r"\(Pdb\+\+\)")
Expand Down
51 changes: 30 additions & 21 deletions tests/test_docs_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,16 @@ def run(script_code):
ids=lambda t: t[1],
)
def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user
'''
Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor.

On windows a little more "finessing" is done to make
``multiprocessing`` play nice: we copy the ``__main__.py`` into the
test directory and invoke the script as a module with ``python -m
test_example``.
"""

'''
ex_file = os.path.join(*example_script)

if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
Expand All @@ -110,25 +112,32 @@ def test_example(run_example_in_subproc, example_script):
code = ex.read()

with run_example_in_subproc(code) as proc:
proc.wait()
err, _ = proc.stderr.read(), proc.stdout.read()
# print(f'STDERR: {err}')
# print(f'STDOUT: {out}')

# if we get some gnarly output let's aggregate and raise
if err:
try:
proc.wait(timeout=5)
finally:
err = proc.stderr.read()
errmsg = err.decode()
errlines = errmsg.splitlines()
last_error = errlines[-1]
if (
'Error' in last_error

# XXX: currently we print this to console, but maybe
# shouldn't eventually once we figure out what's
# a better way to be explicit about aio side
# cancels?
and 'asyncio.exceptions.CancelledError' not in last_error
):
raise Exception(errmsg)
out = proc.stdout.read()
outmsg = out.decode()

if out:
print(f'STDOUT: {out.decode()}')

# if we get some gnarly output let's aggregate and raise
if err:
print(f'STDERR:\n{errmsg}')
errmsg = err.decode()
errlines = errmsg.splitlines()
last_error = errlines[-1]
if (
'Error' in last_error

# XXX: currently we print this to console, but maybe
# shouldn't eventually once we figure out what's
# a better way to be explicit about aio side
# cancels?
and 'asyncio.exceptions.CancelledError' not in last_error
):
raise Exception(errmsg)

assert proc.returncode == 0
167 changes: 167 additions & 0 deletions tests/test_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
Shared mem primitives and APIs.

"""
import uuid

# import numpy
import pytest
import trio
import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
)


@tractor.context
async def child_attach_shml_alot(
ctx: tractor.Context,
shm_key: str,
) -> None:

await ctx.started(shm_key)

# now try to attach a boatload of times in a loop..
for _ in range(1000):
shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key
await trio.sleep(0.001)


def test_child_attaches_alot():
async def main():
async with tractor.open_nursery() as an:

# allocate writeable list in parent
key = f'shml_{uuid.uuid4()}'
shml = open_shm_list(
key=key,
)

portal = await an.start_actor(
'shm_attacher',
enable_modules=[__name__],
)

async with (
portal.open_context(
child_attach_shml_alot,
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
await ctx.result()

await portal.cancel_actor()

trio.run(main)


@tractor.context
async def child_read_shm_list(
ctx: tractor.Context,
shm_key: str,
use_str: bool,
frame_size: int,
) -> None:

# attach in child
shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key)

async with ctx.open_stream() as stream:
async for i in stream:
print(f'(child): reading shm list index: {i}')

if use_str:
expect = str(float(i))
else:
expect = float(i)

if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')


@pytest.mark.parametrize(
'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
)
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader(
use_str: bool,
frame_size: int,
):

async def main():
async with tractor.open_nursery(
# debug_mode=True,
) as an:

portal = await an.start_actor(
'shm_reader',
enable_modules=[__name__],
debug_mode=True,
)

# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
dtype=str if use_str else float,
readonly=False,
)

async with (
portal.open_context(
child_read_shm_list,
shm_key=key,
use_str=use_str,
frame_size=frame_size,
) as (ctx, sent),

ctx.open_stream() as stream,
):

assert sent == key

for i in range(seq_size):

val = float(i)
if use_str:
val = str(val)

# print(f'(parent): writing {val}')
shml[i] = val

# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i)

await portal.cancel_actor()

trio.run(main)
17 changes: 13 additions & 4 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ def __init__(
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
self._actoruid2nursery: dict[
tuple[str, str],
ActorNursery | None,
] = {} # type: ignore # noqa

async def wait_for_peer(
self, uid: tuple[str, str]
Expand Down Expand Up @@ -826,7 +829,12 @@ async def _push_result(

if ctx._backpressure:
log.warning(text)
await send_chan.send(msg)
try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else:
try:
raise StreamOverrun(text) from None
Expand Down Expand Up @@ -1371,8 +1379,9 @@ async def async_main(
actor.lifetime_stack.close()

# Unregister actor from the arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
if (
registered_with_arbiter
and not actor.is_arbiter
):
failed = False
with trio.move_on_after(0.5) as cs:
Expand Down
Loading