Skip to content

Commit

Permalink
Document Scheduler and Worker state machine (#6948)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Aug 30, 2022
1 parent 34b18a9 commit 817ead3
Show file tree
Hide file tree
Showing 23 changed files with 1,446 additions and 353 deletions.
39 changes: 23 additions & 16 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,10 @@ class TaskState:

#: The current state of the task
state: TaskStateState = "released"
#: The previous state of the task. It is not None iff state in (cancelled, resumed).
#: The previous state of the task. It is not None iff :attr:`state` in
#: (cancelled, resumed).
previous: Literal["executing", "long-running", "flight", None] = None
#: The next state of the task. It is not None iff state == resumed.
#: The next state of the task. It is not None iff :attr:`state` == resumed.
next: Literal["fetch", "waiting", None] = None

#: Expected duration of the task
Expand Down Expand Up @@ -278,7 +279,9 @@ class TaskState:
nbytes: int | None = None
#: Arbitrary task annotations
annotations: dict | None = None
#: True if the task is in memory or erred; False otherwise
#: True if the :meth:`~WorkerBase.execute` or :meth:`~WorkerBase.gather_dep`
#: coroutine servicing this task completed; False otherwise. This flag changes
#: the behaviour of transitions out of the ``executing``, ``flight`` etc. states.
done: bool = False

_instances: ClassVar[weakref.WeakSet[TaskState]] = weakref.WeakSet()
Expand Down Expand Up @@ -563,7 +566,10 @@ class StealResponseMsg(SendMessageToScheduler):

@dataclass
class StateMachineEvent:
"""Base abstract class for all stimuli that can modify the worker state"""

__slots__ = ("stimulus_id", "handled")
#: Unique ID of the event
stimulus_id: str
#: timestamp of when the event was handled by the worker
# TODO Switch to @dataclass(slots=True), uncomment the line below, and remove the
Expand All @@ -572,6 +578,7 @@ class StateMachineEvent:
_classes: ClassVar[dict[str, type[StateMachineEvent]]] = {}

def __new__(cls, *args: Any, **kwargs: Any) -> StateMachineEvent:
"""Hack to initialize the ``handled`` attribute in Python <3.10"""
self = object.__new__(cls)
self.handled = None
return self
Expand Down Expand Up @@ -1127,11 +1134,11 @@ class WorkerState:
#: All and only tasks with ``TaskState.state == 'missing'``.
missing_dep_flight: set[TaskState]

#: Which tasks that are coming to us in current peer-to-peer connections.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'flight'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'flight`
#: Tasks that are coming to us in current peer-to-peer connections.
#:
#: This set includes exclusively tasks with :attr:`~TaskState.state` == 'flight' as
#: well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'flight`.
#:
#: See also :meth:`in_flight_tasks_count`.
in_flight_tasks: set[TaskState]
Expand Down Expand Up @@ -1176,10 +1183,10 @@ class WorkerState:
available_resources: dict[str, float]

#: Set of tasks that are currently running.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'executing'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'executing`
#:
#: This set includes exclusively tasks with :attr:`~TaskState.state` == 'executing'
#: as well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'executing`.
#:
#: See also :meth:`executing_count` and :attr:`long_running`.
executing: set[TaskState]
Expand All @@ -1188,11 +1195,11 @@ class WorkerState:
#: :func:`~distributed.secede`, so they no longer count towards the maximum number
#: of concurrent tasks (nthreads).
#: These tasks do not appear in the :attr:`executing` set.
#: This set includes exclusively:
#: - tasks with :attr:`state` == 'long-running'
#: - tasks with :attr:`state` in ('cancelled', 'resumed') and
#: :attr:`previous` == 'long-running`
#:
#: This set includes exclusively tasks with
#: :attr:`~TaskState.state` == 'long-running' as well as tasks with
#: :attr:`~TaskState.state` in ('cancelled', 'resumed') and
#: :attr:`~TaskState.previous` == 'long-running`.
long_running: set[TaskState]

#: A number of tasks that this worker has run in its lifetime; this includes failed
Expand Down
9 changes: 9 additions & 0 deletions docs/source/images/run_dot.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

set -o errexit

for in_fname in *.dot
do
out_fname=${in_fname%.dot}.svg
dot -Tsvg $in_fname > $out_fname
done
7 changes: 2 additions & 5 deletions docs/source/images/task-state.dot
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ digraph{
];
released1 [label=released];
released2 [label=released];
new -> released1;
released1 -> waiting;
waiting -> processing;
waiting -> "no-worker";
"no-worker" -> waiting;
"no-worker" -> processing;
waiting -> "no-worker" [dir=both];
processing -> memory;
processing -> error;
error -> forgotten;
error -> released2;
memory -> released2;
released2 -> forgotten;
}
150 changes: 73 additions & 77 deletions docs/source/images/task-state.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions docs/source/images/worker-cancel-state1.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
digraph{
graph [
bgcolor="#FFFFFFF00",
rankdir=LR,
];

executing1 [label="executing"];
executing2 [label="executing"];
cancelled [label="cancelled(executing)"];
resumed [label="resumed(fetch)"];

executing1 -> cancelled;
cancelled -> released;
cancelled -> executing2;
released -> forgotten;

cancelled -> resumed [dir=both];
resumed -> executing2;
resumed -> memory;
resumed -> fetch;
}
Loading

0 comments on commit 817ead3

Please sign in to comment.