Skip to content

Commit

Permalink
Test stimulus flows
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Apr 13, 2022
1 parent 807add8 commit 0913001
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 1 deletion.
54 changes: 54 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from distributed.utils_test import (
TaskStateMetadataPlugin,
_UnhashableCallable,
assert_stimulus_flow,
async_wait_for,
asyncinc,
captured_logger,
Expand Down Expand Up @@ -7503,3 +7504,56 @@ async def test_wait_for_workers_updates_info(c, s):
async with Worker(s.address):
await c.wait_for_workers(1)
assert c.scheduler_info()["workers"]


@gen_cluster(client=True)
async def test_stimulus_flow_simple(c, s, *workers):
f = c.submit(inc, 1)
assert await f == 2

assert_stimulus_flow(
f.key,
s,
*workers,
flow={
("a", "scheduler", "update-graph-hlg"): ("a", "worker", "compute-task"),
("a", "worker", "compute-task"): ("a", "worker", "task-finished"),
("a", "worker", "task-finished"): ("a", "scheduler", "task-finished"),
("a", "scheduler", "task-finished"): None,
},
)


@gen_cluster(client=True)
async def test_stimulus_flow_retry(c, s, *workers):
def task():
assert dask.config.get("foo")

with dask.config.set(foo=False):
f = c.submit(task)
with pytest.raises(AssertionError):
await f

with dask.config.set(foo=True):
await f.retry()
await f

assert_stimulus_flow(
f.key,
s,
*workers,
flow={
("a", "scheduler", "update-graph-hlg"): ("a", "worker", "compute-task"),
("a", "worker", "compute-task"): ("a", "worker", "task-erred"),
("a", "worker", "task-erred"): ("a", "scheduler", "task-erred"),
("a", "scheduler", "task-erred"): ("a", "scheduler", "stimulus-retry"),
("a", "scheduler", "stimulus-retry"): [
("b", "worker", "compute-task"),
("c", "worker", "stimulus-retry"),
],
("b", "worker", "compute-task"): ("b", "worker", "task-finished"),
("b", "worker", "task-finished"): ("b", "scheduler", "task-finished"),
("b", "scheduler", "task-finished"): None,
("c", "worker", "stimulus-retry"): None,
},
)
120 changes: 119 additions & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import tempfile
import threading
import weakref
from collections import defaultdict
from collections import Counter, defaultdict
from collections.abc import Callable
from contextlib import contextmanager, nullcontext, suppress
from itertools import count
Expand Down Expand Up @@ -1875,6 +1875,124 @@ def xfail_ssl_issue5601():
raise


class InvalidStimulusFlow(ValueError):
"""Thrown on InvalidStimulusFlow"""


def assert_stimulus_flow(key, scheduler, *workers, flow):
"""Tests that a flow of stimuli through a cluster's Scheduler and Workers
match their transition_log story
The following describes a linear flow where "x" is provided as part of a graph the scheduler,
is computed on a worker, successfully completes there and is marked as completed on the
scheduler, all as part of a single flow "a"
.. code-block:: python
assert_stimulus_flow("x", scheduler, *workers, flow={
("a", "scheduler", "update-graph-hlg"): ("a", "worker", "compute-task"),
("a", "worker", "compute-task"): ("a", "worker", "task-finished"),
("a", "worker", "task-finished"): ("a", "scheduler", "task-finished"),
("a", "scheduler", "task-finished"): None
})
The following describes a branching flow wherecomputation of x is attempted on a worker,
but an error occurs as part of flow "a". The scheduler then releases keys from this worker
as part of flow "c" and successfully retries computation of the task on another worker
as part of flow "b".
.. code-block:: python
assert_stimulus_flow("x", scheduler, *workers, flow={
("a" ,"scheduler", "update-graph-hlg"): ("a", 'worker', 'compute-task'),
("a", 'worker', 'compute-task'): ("a", 'worker', 'task-erred'),
("a", 'worker', 'task-erred'): ("a", 'scheduler', 'task-erred'),
("a", 'scheduler', 'task-erred'): ("a", 'scheduler', 'stimulus-retry'),
("a", 'scheduler', 'stimulus-retry'): [("b", 'worker', 'compute-task'), ("c", 'worker', 'stimulus-retry')],
("b", 'worker', 'compute-task'): ("b", 'worker', 'task-finished'),
("b", 'worker', 'task-finished'): ("b", 'scheduler', 'task-finished'),
("b", 'scheduler', 'task-finished'): None,
("c", 'worker', 'stimulus-retry'): None,
})
Parameters
----------
key : str
The key for the task we wish to validate the flow against
scheduler : Scheduler
Cluster scheduler
*workers : tuple of Workers
Cluster workers
flow : dict
A stimulus flow dictionary is of the form :code:`{(flow_id, entity, stimulus_prefix): value}`
where
1. ``flow_id`` is an identifier uniquely id'ing a flow
2. ``entity`` is "scheduler" or "worker"
3. ``stimulus_prefix`` is the prefix of the stimulus logged on the entity
4. ``value`` is
1. a :code:`{(flow_id, entity, stimulus_prefix)}` tuple
2. a list of the tuples above, indicating a flow branch.
The ``flow_id``'s must be unique.
3. None, indicated the terminus of a flow.
"""
# Build a (entity, stimulus_id, timestamp) log from scheduler and worker
# transition logs
log = [("scheduler",) + e[-2:] for e in scheduler.transition_log if e[0] == key]

for w in workers:
log.extend(("worker",) + e[-2:] for e in w.log if e[0] == key)

# Build a unique (entity, stimulus_id) story, sorted on timestamp
story = ((e[0], e[-2]) for e in sorted(log, key=lambda e: e[-1]))
story = list(dict.fromkeys(story))

def dfs(flow_id, entity, stimulus):
remove = None

# TODO(sjperkins)
# O(n^2). Improve this.
for i, (s_entity, s_stimulus) in enumerate(story):
if s_entity == entity and s_stimulus.startswith(stimulus):
remove = i
break

if remove is None:
raise InvalidStimulusFlow(
f"{(entity, stimulus)} not found in cluster transition story"
)

story.pop(remove)

try:
v = flow[(flow_id, entity, stimulus)]
except KeyError:
raise InvalidStimulusFlow(
f"Key {(flow_id, entity, stimulus)} not found in flow"
)

if isinstance(v, tuple):
flow_id, entity, stimulus = v
dfs(flow_id, entity, stimulus)
elif isinstance(v, list):
check = Counter([fid for fid, _, _ in v])
if not all(fcts == 1 for fcts in check.values()):
raise InvalidStimulusFlow(f"Repeated flow id's in branch {v}")

for flow_id, entity, stimulus in v:
dfs(flow_id, entity, stimulus)
elif v is None:
# Terminal flow node
return
else:
raise TypeError(f"Invalid flow specification {type(v)}")

flow_id, entity, stimulus = next(iter(flow))
dfs(flow_id, entity, stimulus)
assert not story


def assert_story(
story: list[tuple], expect: list[tuple], *, strict: bool = False
) -> None:
Expand Down

0 comments on commit 0913001

Please sign in to comment.