From 817ead3aed90bad8c6bd38063ec8bdfcacb4ff5f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 30 Aug 2022 17:11:35 +0100 Subject: [PATCH] Document Scheduler and Worker state machine (#6948) --- distributed/worker_state_machine.py | 39 +- docs/source/images/run_dot.sh | 9 + docs/source/images/task-state.dot | 7 +- docs/source/images/task-state.svg | 150 +++--- docs/source/images/worker-cancel-state1.dot | 21 + docs/source/images/worker-cancel-state1.svg | 109 +++++ docs/source/images/worker-cancel-state2.dot | 21 + docs/source/images/worker-cancel-state2.svg | 109 +++++ docs/source/images/worker-dep-state.dot | 5 +- docs/source/images/worker-dep-state.svg | 75 +-- ...ask-state.dot => worker-execute-state.dot} | 6 +- docs/source/images/worker-execute-state.svg | 138 ++++++ docs/source/images/worker-forget-state.dot | 15 + docs/source/images/worker-forget-state.svg | 79 +++ docs/source/images/worker-scatter-state.dot | 7 + docs/source/images/worker-scatter-state.svg | 30 ++ docs/source/images/worker-state-machine.dot | 55 +++ docs/source/images/worker-state-machine.svg | 127 +++++ docs/source/images/worker-task-state.svg | 103 ---- docs/source/index.rst | 1 + docs/source/scheduling-state.rst | 159 ++++-- docs/source/worker-state.rst | 459 ++++++++++++++++++ docs/source/worker.rst | 75 +-- 23 files changed, 1446 insertions(+), 353 deletions(-) create mode 100755 docs/source/images/run_dot.sh create mode 100644 docs/source/images/worker-cancel-state1.dot create mode 100644 docs/source/images/worker-cancel-state1.svg create mode 100644 docs/source/images/worker-cancel-state2.dot create mode 100644 docs/source/images/worker-cancel-state2.svg rename docs/source/images/{worker-task-state.dot => worker-execute-state.dot} (64%) create mode 100644 docs/source/images/worker-execute-state.svg create mode 100644 docs/source/images/worker-forget-state.dot create mode 100644 docs/source/images/worker-forget-state.svg create mode 100644 docs/source/images/worker-scatter-state.dot create mode 100644 docs/source/images/worker-scatter-state.svg create mode 100644 docs/source/images/worker-state-machine.dot create mode 100644 docs/source/images/worker-state-machine.svg delete mode 100644 docs/source/images/worker-task-state.svg create mode 100644 docs/source/worker-state.rst diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 88d2e93692..431c8d9a8c 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -238,9 +238,10 @@ class TaskState: #: The current state of the task state: TaskStateState = "released" - #: The previous state of the task. It is not None iff state in (cancelled, resumed). + #: The previous state of the task. It is not None iff :attr:`state` in + #: (cancelled, resumed). previous: Literal["executing", "long-running", "flight", None] = None - #: The next state of the task. It is not None iff state == resumed. + #: The next state of the task. It is not None iff :attr:`state` == resumed. next: Literal["fetch", "waiting", None] = None #: Expected duration of the task @@ -278,7 +279,9 @@ class TaskState: nbytes: int | None = None #: Arbitrary task annotations annotations: dict | None = None - #: True if the task is in memory or erred; False otherwise + #: True if the :meth:`~WorkerBase.execute` or :meth:`~WorkerBase.gather_dep` + #: coroutine servicing this task completed; False otherwise. This flag changes + #: the behaviour of transitions out of the ``executing``, ``flight`` etc. states. done: bool = False _instances: ClassVar[weakref.WeakSet[TaskState]] = weakref.WeakSet() @@ -563,7 +566,10 @@ class StealResponseMsg(SendMessageToScheduler): @dataclass class StateMachineEvent: + """Base abstract class for all stimuli that can modify the worker state""" + __slots__ = ("stimulus_id", "handled") + #: Unique ID of the event stimulus_id: str #: timestamp of when the event was handled by the worker # TODO Switch to @dataclass(slots=True), uncomment the line below, and remove the @@ -572,6 +578,7 @@ class StateMachineEvent: _classes: ClassVar[dict[str, type[StateMachineEvent]]] = {} def __new__(cls, *args: Any, **kwargs: Any) -> StateMachineEvent: + """Hack to initialize the ``handled`` attribute in Python <3.10""" self = object.__new__(cls) self.handled = None return self @@ -1127,11 +1134,11 @@ class WorkerState: #: All and only tasks with ``TaskState.state == 'missing'``. missing_dep_flight: set[TaskState] - #: Which tasks that are coming to us in current peer-to-peer connections. - #: This set includes exclusively: - #: - tasks with :attr:`state` == 'flight' - #: - tasks with :attr:`state` in ('cancelled', 'resumed') and - #: :attr:`previous` == 'flight` + #: Tasks that are coming to us in current peer-to-peer connections. + #: + #: This set includes exclusively tasks with :attr:`~TaskState.state` == 'flight' as + #: well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and + #: :attr:`~TaskState.previous` == 'flight`. #: #: See also :meth:`in_flight_tasks_count`. in_flight_tasks: set[TaskState] @@ -1176,10 +1183,10 @@ class WorkerState: available_resources: dict[str, float] #: Set of tasks that are currently running. - #: This set includes exclusively: - #: - tasks with :attr:`state` == 'executing' - #: - tasks with :attr:`state` in ('cancelled', 'resumed') and - #: :attr:`previous` == 'executing` + #: + #: This set includes exclusively tasks with :attr:`~TaskState.state` == 'executing' + #: as well as tasks with :attr:`~TaskState.state` in ('cancelled', 'resumed') and + #: :attr:`~TaskState.previous` == 'executing`. #: #: See also :meth:`executing_count` and :attr:`long_running`. executing: set[TaskState] @@ -1188,11 +1195,11 @@ class WorkerState: #: :func:`~distributed.secede`, so they no longer count towards the maximum number #: of concurrent tasks (nthreads). #: These tasks do not appear in the :attr:`executing` set. - #: This set includes exclusively: - #: - tasks with :attr:`state` == 'long-running' - #: - tasks with :attr:`state` in ('cancelled', 'resumed') and - #: :attr:`previous` == 'long-running` #: + #: This set includes exclusively tasks with + #: :attr:`~TaskState.state` == 'long-running' as well as tasks with + #: :attr:`~TaskState.state` in ('cancelled', 'resumed') and + #: :attr:`~TaskState.previous` == 'long-running`. long_running: set[TaskState] #: A number of tasks that this worker has run in its lifetime; this includes failed diff --git a/docs/source/images/run_dot.sh b/docs/source/images/run_dot.sh new file mode 100755 index 0000000000..9caf0c338f --- /dev/null +++ b/docs/source/images/run_dot.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -o errexit + +for in_fname in *.dot +do + out_fname=${in_fname%.dot}.svg + dot -Tsvg $in_fname > $out_fname +done diff --git a/docs/source/images/task-state.dot b/docs/source/images/task-state.dot index 52ec18ff57..fde7d8c62c 100644 --- a/docs/source/images/task-state.dot +++ b/docs/source/images/task-state.dot @@ -5,15 +5,12 @@ digraph{ ]; released1 [label=released]; released2 [label=released]; - new -> released1; released1 -> waiting; waiting -> processing; - waiting -> "no-worker"; - "no-worker" -> waiting; - "no-worker" -> processing; + waiting -> "no-worker" [dir=both]; processing -> memory; processing -> error; - error -> forgotten; + error -> released2; memory -> released2; released2 -> forgotten; } diff --git a/docs/source/images/task-state.svg b/docs/source/images/task-state.svg index 49e6e81c98..7b7d5c84ad 100644 --- a/docs/source/images/task-state.svg +++ b/docs/source/images/task-state.svg @@ -1,113 +1,109 @@ - - - - -%3 - + + + + -released1 - -released + +released1 + +released -waiting - -waiting + +waiting + +waiting -released1->waiting - - + +released1->waiting + + -released2 - -released + +released2 + +released -forgotten - -forgotten + +forgotten + +forgotten -released2->forgotten - - - - -new - -new - - -new->released1 - - + +released2->forgotten + + -processing - -processing + +processing + +processing -waiting->processing - - + +waiting->processing + + -no-worker - -no-worker + +no-worker + +no-worker -waiting->no-worker - - + +waiting->no-worker + + + -memory - -memory + +memory + +memory -processing->memory - - + +processing->memory + + -error - -error + +error + +error -processing->error - - - - -no-worker->waiting - - - - -no-worker->processing - - + +processing->error + + -memory->released2 - - - - -error->forgotten - - + +memory->released2 + + + + + +error->released2 + + diff --git a/docs/source/images/worker-cancel-state1.dot b/docs/source/images/worker-cancel-state1.dot new file mode 100644 index 0000000000..72dc1fb881 --- /dev/null +++ b/docs/source/images/worker-cancel-state1.dot @@ -0,0 +1,21 @@ +digraph{ + graph [ + bgcolor="#FFFFFFF00", + rankdir=LR, + ]; + + executing1 [label="executing"]; + executing2 [label="executing"]; + cancelled [label="cancelled(executing)"]; + resumed [label="resumed(fetch)"]; + + executing1 -> cancelled; + cancelled -> released; + cancelled -> executing2; + released -> forgotten; + + cancelled -> resumed [dir=both]; + resumed -> executing2; + resumed -> memory; + resumed -> fetch; +} diff --git a/docs/source/images/worker-cancel-state1.svg b/docs/source/images/worker-cancel-state1.svg new file mode 100644 index 0000000000..243cccc2c5 --- /dev/null +++ b/docs/source/images/worker-cancel-state1.svg @@ -0,0 +1,109 @@ + + + + + + + + + +executing1 + +executing + + + +cancelled + +cancelled(executing) + + + +executing1->cancelled + + + + + +executing2 + +executing + + + +cancelled->executing2 + + + + + +resumed + +resumed(fetch) + + + +cancelled->resumed + + + + + + +released + +released + + + +cancelled->released + + + + + +resumed->executing2 + + + + + +memory + +memory + + + +resumed->memory + + + + + +fetch + +fetch + + + +resumed->fetch + + + + + +forgotten + +forgotten + + + +released->forgotten + + + + + diff --git a/docs/source/images/worker-cancel-state2.dot b/docs/source/images/worker-cancel-state2.dot new file mode 100644 index 0000000000..e5bdcfcad2 --- /dev/null +++ b/docs/source/images/worker-cancel-state2.dot @@ -0,0 +1,21 @@ +digraph{ + graph [ + bgcolor="#FFFFFFF00", + rankdir=LR, + ]; + + flight1 [label=flight]; + flight2 [label=flight]; + cancelled [label="cancelled(flight)"]; + resumed [label="resumed(waiting)"]; + + flight1 -> cancelled; + cancelled -> flight2; + cancelled -> released; + released -> forgotten; + + cancelled -> resumed [dir=both]; + resumed -> flight2; + resumed -> memory; + resumed -> waiting; +} diff --git a/docs/source/images/worker-cancel-state2.svg b/docs/source/images/worker-cancel-state2.svg new file mode 100644 index 0000000000..31c391f5a2 --- /dev/null +++ b/docs/source/images/worker-cancel-state2.svg @@ -0,0 +1,109 @@ + + + + + + + + + +flight1 + +flight + + + +cancelled + +cancelled(flight) + + + +flight1->cancelled + + + + + +flight2 + +flight + + + +cancelled->flight2 + + + + + +resumed + +resumed(waiting) + + + +cancelled->resumed + + + + + + +released + +released + + + +cancelled->released + + + + + +resumed->flight2 + + + + + +memory + +memory + + + +resumed->memory + + + + + +waiting + +waiting + + + +resumed->waiting + + + + + +forgotten + +forgotten + + + +released->forgotten + + + + + diff --git a/docs/source/images/worker-dep-state.dot b/docs/source/images/worker-dep-state.dot index 18a5e40cfa..44ba3a7266 100644 --- a/docs/source/images/worker-dep-state.dot +++ b/docs/source/images/worker-dep-state.dot @@ -3,8 +3,11 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; - new -> fetch; + + released -> fetch; fetch -> flight; flight -> fetch; + fetch -> missing; + missing -> fetch; flight -> memory; } diff --git a/docs/source/images/worker-dep-state.svg b/docs/source/images/worker-dep-state.svg index 9b7b04e8d9..76e83eb4c9 100644 --- a/docs/source/images/worker-dep-state.svg +++ b/docs/source/images/worker-dep-state.svg @@ -1,61 +1,78 @@ - - - - -%3 - - + + + + + -new - -new +released + +released fetch - -fetch + +fetch - + -new->fetch - - +released->fetch + + flight - -flight + +flight fetch->flight - - + + + + + +missing + +missing + + + +fetch->missing + + flight->fetch - - + + - + memory - -memory + +memory - + flight->memory - - + + + + + +missing->fetch + + diff --git a/docs/source/images/worker-task-state.dot b/docs/source/images/worker-execute-state.dot similarity index 64% rename from docs/source/images/worker-task-state.dot rename to docs/source/images/worker-execute-state.dot index 4a6fc8cbac..769993c30d 100644 --- a/docs/source/images/worker-task-state.dot +++ b/docs/source/images/worker-execute-state.dot @@ -3,12 +3,16 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; - new -> waiting; + released -> waiting; waiting -> ready; + waiting -> constrained; ready -> executing; + constrained -> executing; executing -> "long-running"; executing -> memory; executing -> error; + executing -> rescheduled; "long-running" -> memory; "long-running" -> error; + "long-running" -> rescheduled; } diff --git a/docs/source/images/worker-execute-state.svg b/docs/source/images/worker-execute-state.svg new file mode 100644 index 0000000000..f12b58f65d --- /dev/null +++ b/docs/source/images/worker-execute-state.svg @@ -0,0 +1,138 @@ + + + + + + + + + +released + +released + + + +waiting + +waiting + + + +released->waiting + + + + + +ready + +ready + + + +waiting->ready + + + + + +constrained + +constrained + + + +waiting->constrained + + + + + +executing + +executing + + + +ready->executing + + + + + +constrained->executing + + + + + +long-running + +long-running + + + +executing->long-running + + + + + +memory + +memory + + + +executing->memory + + + + + +error + +error + + + +executing->error + + + + + +rescheduled + +rescheduled + + + +executing->rescheduled + + + + + +long-running->memory + + + + + +long-running->error + + + + + +long-running->rescheduled + + + + + diff --git a/docs/source/images/worker-forget-state.dot b/docs/source/images/worker-forget-state.dot new file mode 100644 index 0000000000..8eee411fa3 --- /dev/null +++ b/docs/source/images/worker-forget-state.dot @@ -0,0 +1,15 @@ +digraph{ + graph [ + bgcolor="#FFFFFFF00", + rankdir=LR, + ]; + free_keys [ + label="Wait for free-keys\nfrom the scheduler", + shape=box,style=dashed, + ]; + memory -> free_keys; + error -> free_keys; + free_keys -> released; + rescheduled -> released; + released -> forgotten; +} diff --git a/docs/source/images/worker-forget-state.svg b/docs/source/images/worker-forget-state.svg new file mode 100644 index 0000000000..64cf54891e --- /dev/null +++ b/docs/source/images/worker-forget-state.svg @@ -0,0 +1,79 @@ + + + + + + + + + +free_keys + +Wait for free-keys +from the scheduler + + + +released + +released + + + +free_keys->released + + + + + +memory + +memory + + + +memory->free_keys + + + + + +error + +error + + + +error->free_keys + + + + + +forgotten + +forgotten + + + +released->forgotten + + + + + +rescheduled + +rescheduled + + + +rescheduled->released + + + + + diff --git a/docs/source/images/worker-scatter-state.dot b/docs/source/images/worker-scatter-state.dot new file mode 100644 index 0000000000..cc36ce3d71 --- /dev/null +++ b/docs/source/images/worker-scatter-state.dot @@ -0,0 +1,7 @@ +digraph{ + graph [ + bgcolor="#FFFFFFF00", + rankdir=LR, + ]; + released -> memory; +} diff --git a/docs/source/images/worker-scatter-state.svg b/docs/source/images/worker-scatter-state.svg new file mode 100644 index 0000000000..ee0654021e --- /dev/null +++ b/docs/source/images/worker-scatter-state.svg @@ -0,0 +1,30 @@ + + + + + + + + + +released + +released + + + +memory + +memory + + + +released->memory + + + + + diff --git a/docs/source/images/worker-state-machine.dot b/docs/source/images/worker-state-machine.dot new file mode 100644 index 0000000000..a3cd798ac8 --- /dev/null +++ b/docs/source/images/worker-state-machine.dot @@ -0,0 +1,55 @@ +digraph { + graph [ + bgcolor="#FFFFFFF00", + rankdir=TB, + ]; + + Scheduler [ + shape=rect, + fontsize = 20, + ]; + + stimuli [ + label="StateMachineEvent", + shape=rect, + style=filled, + color=grey, + ]; + + instructions [ + label="list[Instruction]", + shape=rect, + style=filled, + color=grey, + ]; + + subgraph cluster_0 { + label = "Worker"; + fontsize = 20; + + Worker_handle_stimulus [label="handle_stimulus()"]; + Worker_execute [label="execute()\ngather_dep()\nbatched_send()"]; + } + + subgraph cluster_1 { + label = "BaseWorker"; + fontsize = 20; + + BaseWorker_handle_stimulus [label="handle_stimulus()"]; + BaseWorker_execute [label="execute()\ngather_dep()\nbatched_send()"]; + } + + subgraph cluster_2 { + label = "WorkerState"; + fontsize = 20; + + WorkerState_handle_stimulus [label="handle_stimulus()"]; + } + + Scheduler -> stimuli; + stimuli -> Worker_handle_stimulus; + Worker_handle_stimulus -> BaseWorker_handle_stimulus; + BaseWorker_handle_stimulus -> WorkerState_handle_stimulus; + # trick to have arrows going from the bottom to the top of the graph + stimuli -> Worker_execute -> BaseWorker_execute -> instructions -> WorkerState_handle_stimulus [dir=back]; +} diff --git a/docs/source/images/worker-state-machine.svg b/docs/source/images/worker-state-machine.svg new file mode 100644 index 0000000000..ef0e4bf476 --- /dev/null +++ b/docs/source/images/worker-state-machine.svg @@ -0,0 +1,127 @@ + + + + + + + + +cluster_0 + +Worker + + +cluster_1 + +BaseWorker + + +cluster_2 + +WorkerState + + + +Scheduler + +Scheduler + + + +stimuli + +StateMachineEvent + + + +Scheduler->stimuli + + + + + +Worker_handle_stimulus + +handle_stimulus() + + + +stimuli->Worker_handle_stimulus + + + + + +Worker_execute + +execute() +gather_dep() +batched_send() + + + +stimuli->Worker_execute + + + + + +instructions + +list[Instruction] + + + +WorkerState_handle_stimulus + +handle_stimulus() + + + +instructions->WorkerState_handle_stimulus + + + + + +BaseWorker_handle_stimulus + +handle_stimulus() + + + +Worker_handle_stimulus->BaseWorker_handle_stimulus + + + + + +BaseWorker_execute + +execute() +gather_dep() +batched_send() + + + +Worker_execute->BaseWorker_execute + + + + + +BaseWorker_handle_stimulus->WorkerState_handle_stimulus + + + + + +BaseWorker_execute->instructions + + + + + diff --git a/docs/source/images/worker-task-state.svg b/docs/source/images/worker-task-state.svg deleted file mode 100644 index edc4b83404..0000000000 --- a/docs/source/images/worker-task-state.svg +++ /dev/null @@ -1,103 +0,0 @@ - - - - - - -%3 - - - -new - -new - - - -waiting - -waiting - - - -new->waiting - - - - - -ready - -ready - - - -waiting->ready - - - - - -executing - -executing - - - -ready->executing - - - - - -long-running - -long-running - - - -executing->long-running - - - - - -memory - -memory - - - -executing->memory - - - - - -error - -error - - - -executing->error - - - - - -long-running->memory - - - - - -long-running->error - - - - - diff --git a/docs/source/index.rst b/docs/source/index.rst index 6ae9744395..2c76cb5f68 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -100,6 +100,7 @@ Contents scheduling-policies scheduling-state worker + worker-state worker-memory work-stealing killed diff --git a/docs/source/scheduling-state.rst b/docs/source/scheduling-state.rst index 3e8dfb4e81..9354b8f6b7 100644 --- a/docs/source/scheduling-state.rst +++ b/docs/source/scheduling-state.rst @@ -1,5 +1,5 @@ -Scheduling State -================ +Scheduler State Machine +======================= .. currentmodule:: distributed.scheduler @@ -60,33 +60,42 @@ Tasks flow along the following states with the following allowed transitions: .. image:: images/task-state.svg :alt: Dask scheduler task states -* *Released*: Known but not actively computing or in memory -* *Waiting*: On track to be computed, waiting on dependencies to arrive in - memory -* *No-worker*: Ready to be computed, but no appropriate worker exists - (for example because of resource restrictions, or because no worker is - connected at all). -* *Processing*: All dependencies are available and the task is assigned to a - worker for compute (the scheduler doesn't know whether it's in a worker - queue or actively being computed). -* *Memory*: In memory on one or more workers -* *Erred*: Task computation, or one of its dependencies, has encountered an error -* *Forgotten* (not actually a state): Task is no longer needed by any client - or dependent task +released + Known but not actively computing or in memory +waiting + On track to be computed, waiting on dependencies to arrive in memory +no-worker + Ready to be computed, but no appropriate worker exists (for example because of + resource restrictions, or because no worker is connected at all). +processing + All dependencies are available and the task is assigned to a worker for compute (the + scheduler doesn't know whether it's in a worker queue or actively being computed). +memory + In memory on one or more workers +erred + Task computation, or one of its dependencies, has encountered an error +forgotten + Task is no longer needed by any client or dependent task, so it disappears from the + scheduler as well. As soon as a task reaches this state, it is immediately + dereferenced from the scheduler. + +.. note:: + There's no intermediate state between ``waiting`` / ``no-worker`` and + ``processing``: as soon as a task has all of its dependencies in memory somewhere on + the cluster, it is immediately assigned to a worker. This can lead to very long task + queues on the workers, which are then rebalanced dynamically through + :doc:`work-stealing`. In addition to the literal state, though, other information needs to be kept and updated about each task. Individual task state is stored in an -object named :class:`TaskState` and consists of the following information: - -.. autoclass:: TaskState - :members: +object named :class:`TaskState`; see full API through the link. The scheduler keeps track of all the :class:`TaskState` objects (those not in the "forgotten" state) using several containers: .. attribute:: tasks: {str: TaskState} - A dictionary mapping task keys (usually strings) to :class:`TaskState` + A dictionary mapping task keys (always strings) to :class:`TaskState` objects. Task keys are how information about tasks is communicated between the scheduler and clients, or the scheduler and workers; this dictionary is then used to find the corresponding :class:`TaskState` @@ -99,17 +108,23 @@ not in the "forgotten" state) using several containers: (their :attr:`~TaskState.waiting_on` set is empty), and are waiting for an appropriate worker to join the network before computing. +Once a task is queued up on a worker, it is also tracked on the worker side by the +:doc:`worker-state`. + Worker State ------------ -Each worker's current state is stored in a :class:`WorkerState` object. +Each worker's current state is stored in a :class:`WorkerState` object; see full API +through the link. + +This is a scheduler-side object, which holds information about what the scheduler +knows about each worker on the cluster, and is not to be confused with +:class:`distributed.worker-state-machine.WorkerState`. + This information is involved in deciding :ref:`which worker to run a task on `. -.. autoclass:: WorkerState - :members: - In addition to individual worker state, the scheduler maintains two containers to help with scheduling tasks: @@ -136,13 +151,7 @@ Client State ------------ Information about each individual client of the scheduler is kept -in a :class:`ClientState` object: - -.. autoclass:: ClientState - :members: - - -.. XXX list invariants somewhere? +in a :class:`ClientState` object; see full API through the link. Understanding a Task's Flow @@ -215,6 +224,8 @@ memory → forgotten nbytes :attr:`WorkerState.nbytes`. +.. _scheduling_state_implementation: + Implementation -------------- @@ -224,21 +235,21 @@ name of the start and finish task state like the following. .. code-block:: python - def transition_released_waiting(self, key): + def transition_released_waiting(self, key, stimulus_id): ... - def transition_processing_memory(self, key): + def transition_processing_memory(self, key, stimulus_id): ... - def transition_processing_erred(self, key): + def transition_processing_erred(self, key, stimulus_id): ... These functions each have three effects. 1. They perform the necessary transformations on the scheduler state (the 20 dicts/lists/sets) to move one key between states. 2. They return a dictionary of recommended ``{key: state}`` transitions to - enact directly afterwards on other keys. For example after we transition a - key into memory we may find that many waiting keys are now ready to + enact directly afterwards on other keys. For example, after we transition a + key into memory, we may find that many waiting keys are now ready to transition from waiting to a ready state. -3. Optionally they include a set of validation checks that can be turned on +3. Optionally, they include a set of validation checks that can be turned on for testing. Rather than call these functions directly we call the central function @@ -246,8 +257,7 @@ Rather than call these functions directly we call the central function .. code-block:: python - def transition(self, key, final_state): - """ Transition key to the suggested state """ + def transition(self, key, final_state, stimulus_id): ... This transition function finds the appropriate path from the current to the final state. It also serves as a central point for logging and diagnostics. @@ -258,7 +268,7 @@ steady state. For that we use the ``transitions`` function (note the plural ``s .. code-block:: python - def transitions(self, recommendations): + def transitions(self, recommendations, stimulus_id): recommendations = recommendations.copy() while recommendations: key, finish = recommendations.popitem() @@ -276,17 +286,55 @@ Transitions occur from stimuli, which are state-changing messages to the scheduler from workers or clients. The scheduler responds to the following stimuli: -* **Workers** - * Task finished: A task has completed on a worker and is now in memory - * Task erred: A task ran and erred on a worker - * Task missing data: A task tried to run but was unable to find necessary - data on other workers - * Worker added: A new worker was added to the network - * Worker removed: An existing worker left the network - -* **Clients** - * Update graph: The client sends more tasks to the scheduler - * Release keys: The client no longer desires the result of certain keys +**Workers** + +task-finished + A task has completed on a worker and is now in memory +task-erred + A task ran and erred on a worker +reschedule + A task has completed on a worker by raising :class:`~distributed.Reschedule` +long-running + A task is still running on the worker, but it called :func:`~distributed.secede` +add-keys + Replication finished. One or more tasks, which were previously in memory on other + workers, are now in memory on one additional worker. Also used to inform the + scheduler of a successful :func:`~distributed.Client.scatter` operation. +request-refresh-who-has + All peers that hold a replica of a task in memory that a worker knows of are + unavailable (temporarily or permanently), so the worker can't fetch it and is asking + the scheduler if it knows of any additional replicas. This call is repeated + periodically until a new replica appears. +release-worker-data + A worker informs that the scheduler that it no longer holds the task in memory +worker-status-change + The global status of a worker has just changed, e.g. between ``running`` and + ``paused``. +log-event + A generic event happend on the worker, which should be logged centrally. + Note that this is in addition to the worker's log, which the client can fetch on + request (up to a certain length). +keep-alive + A worker informs that it's still online and responsive. This uses the batched stream + channel, as opposed to :meth:`distributed.worker.Worker.heartbeat` and + :meth:`Scheduler.heartbeat_worker` which use dedicated RPC comms, and is needed to + prevent firewalls from closing down the batched stream. +register-worker + A new worker was added to the network +unregister + An existing worker left the network + + +**Clients** + +update-graph + The client sends more tasks to the scheduler +client-releases-keys + The client no longer desires the result of certain keys. + +Note that there are many more client API endpoints (e.g. to serve +:func:`~distributed.Client.scatter` etc.), which are not listed here for the sake of +brevity. Stimuli functions are prepended with the text ``stimulus``, and take a variety of keyword arguments from the message as in the following examples: @@ -315,6 +363,15 @@ API :members: :inherited-members: +.. autoclass:: TaskState + :members: + +.. autoclass:: WorkerState + :members: + +.. autoclass:: ClientState + :members: + .. autofunction:: decide_worker .. autoclass:: MemoryState diff --git a/docs/source/worker-state.rst b/docs/source/worker-state.rst new file mode 100644 index 0000000000..a9dce7c26b --- /dev/null +++ b/docs/source/worker-state.rst @@ -0,0 +1,459 @@ +Worker State Machine +==================== + +.. currentmodule:: distributed.worker_state_machine + + +Task states +----------- + +When the Scheduler asks a Worker to compute a task, it is tracked by the Worker through +a :class:`distributed.worker_state_machine.TaskState` object - not to be confused with +the matching scheduler-side class :class:`distributed.scheduler.TaskState`. + +The class has a key attribute, :attr:`TaskState.state`, which can assume the following +values: + +released + Known but not actively computing or in memory. A task can stay in this state when + the scheduler asked to forget it, but it has dependent tasks on the same worker. +waiting + The scheduler has added the task to the worker queue. All of its dependencies are + in memory somewhere on the cluster, but not all of them are in memory on the current + worker, so they need to be fetched. +fetch + This task is in memory on one or more peer workers, but not on this worker. Its data + is queued to be transferred over the network, either because it's a dependency of a + task in ``waiting`` state, or because the :doc:`active_memory_manager` requested it + to be replicated here. + The task can be found in the :attr:`WorkerState.data_needed` heap. +missing + Like ``fetch``, but all peer workers that were listed by the scheduler are either + unreachable or have responded they don't actually have the task data. The worker + will periodically ask the scheduler if it knows of additional replicas; when it + does, the task will transition again to ``fetch``. + The task can be found in the :attr:`WorkerState.missing_dep_flight` set. +flight + The task data is currently being transferred over the network from another worker. + The task can be found in the :attr:`WorkerState.in_flight_tasks` and + :attr:`WorkerState.in_flight_workers` collections. +ready + The task is ready to be computed; all of its dependencies are in memory on the + current worker and it's waiting for an available thread. + The task can be found in the :attr:`WorkerState.ready` heap. +constrained + Like ``ready``, but the user specified :doc:`resource constraints ` for + this task. + The task can be found in the :attr:`WorkerState.constrained` queue. +executing + The task is currently being computed on a thread. + It can be found in the :attr:`WorkerState.executing` set and in the + :attr:`distributed.worker.Worker.active_threads` dict. +long-running + Like ``executing``, but the user code called :func:`distributed.secede` so the task + no longer counts towards the maximum number of concurrent tasks. + It can be found in the :attr:`WorkerState.long_running` set and in the + :attr:`distributed.worker.Worker.active_threads` dict. +rescheduled + The task just raised the :class:`~distributed.Reschedule` exception. This is a + transitory state, which is not stored permanently. +cancelled + The scheduler asked to forget about this task, but it's technically impossible at + the moment. See :ref:`cancelled-tasks`. The task can be found in whatever + collections it was in its :attr:`~TaskState.previous` state. +resumed + The task was recovered from ``cancelled`` state. See :ref:`cancelled-tasks`. + The task can be found in whatever collections it was in its + :attr:`~TaskState.previous` state. +memory + Task execution completed, or the task was successfully transferred from another + worker, and is now held in either :class:`WorkerState.data` or + :class:`WorkerState.actors`. +error + Task execution failed. Alternatively, task execution completed successfully, or the + task data transferred successfully over the network, but it failed to serialize or + deserialize. The full exception and traceback are stored in the task itself, so that + they can be re-raised on the client. +forgotten + The scheduler asked this worker to forget abot the task, and there are neither + dependents nor dependencies on the same worker. As soon as a task reaches this + state, it is immediately dereferenced from the :class:`WorkerState` and will be soon + garbage-collected. This is the only case where two instances of a :class:`TaskState` + object with the same :attr:`~TaskState.key` can (transitorily) exist in the same + interpreter at the same time. + + +Fetching dependencies +--------------------- + +.. image:: images/worker-dep-state.svg + :alt: Worker states for dependencies + +As tasks that need to be computed arrive on the Worker, any dependencies that are not +already in memory on the same worker are wrapped by a :class:`TaskState` object and +contain a listing of workers (:attr:`TaskState.who_has`) to collect their result from. + +These :class:`TaskState` objects have their state set to ``fetch``, are put in the +:attr:`~WorkerState.data_needed` heap, and are progressively transferred over the +network. For each dependency we select a worker at random that has that data and collect +the dependency from that worker. To improve bandwidth, we opportunistically gather other +dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB +of data (:attr:`~WorkerState.transfer_message_target_bytes`) - too little data and +bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50 +connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn +acquired from the configuration key ``distributed.worker.connections.outgoing``) so as +to avoid overly-fragmenting our network bandwidth. + +In the event that the network comms between two workers are saturated, a dependency task +may cycle between ``fetch`` and ``flight`` until it is successfully collected. It may +also happen that a peer worker responds that it doesn't have a replica of the requested +data anymore; finally, the peer worker may be unreachable or unresponsive. When that +happens, the peer is removed from :attr:`~TaskState.who_has` and the task is +transitioned back to ``fetch``, so that the Worker will try gathering the same key from +a different peer. If :attr:`~TaskState.who_has` becomes empty due to this process, the +task transitions to ``missing`` and the Worker starts periodically asking the Scheduler +if additional peers are available. + +The same system used for fetching dependencies is also used by +:doc:`active_memory_manager` replication. + +.. note:: + There is at most one :meth:`~BaseWorker.gather_dep` asyncio task running at any + given time for any given peer worker. If all workers holding a replica of a task + in ``fetch`` state are already in flight, the task will remain in ``fetch`` state + until a worker becomes available again. + + +Computing tasks +--------------- +A :class:`TaskState` that needs to be computed proceeds on the Worker through the +following pipeline. It has its :attr:`~TaskState.run_spec` defined, which instructs the +worker how to execute it. + +.. image:: images/worker-execute-state.svg + :alt: Worker states for computing tasks + +After all dependencies for a task are in memory, the task transitions from ``waiting`` +to ``ready`` or ``constrained`` and is added to the :attr:`~WorkerState.ready` heap. + +As soon as a thread is available, we pop a task from the top of the heap and put the +task into a thread from a local thread pool to execute. + +Optionally, while it's running, this task may identify itself as a long-running task +(see :doc:`Tasks launching tasks `), at which point it secedes from the +thread pool and changes state to `long-running`. ``executing`` and ``long-running`` are +almost identical states, the only difference being that the latter don't count towards +the maximum number of tasks running in parallel at the same time. + +A task can terminate in three ways: + +- Complete successfully; its return value is stored in either :attr:`~WorkerState.data` + or :attr:`~WorkerState.actors` +- Raise an exception; the exception and traceback are stored on the :class:`TaskState` + object +- Raise :class:`~distributed.Reschedule`; it is immediately forgotten. + +In all cases, the outcome is sent back to the scheduler. + + +Scattered data +-------------- +:meth:`Scattered data ` follows an even simpler path, +landing directly in ``memory``: + +.. image:: images/worker-scatter-state.svg + :alt: Worker states for scattered data + + +Forgetting tasks +---------------- +Once a task is in ``memory`` or ``error``, the Worker will hold onto it indefinitely, +until the Scheduler explicitly asks the Worker to forget it. +This happens when there are no more Clients holding a reference to the key and there are +no more waiter tasks (that is, dependents that have not been computed). Additionally, +the :doc:`active_memory_manager` may ask to drop excess replicas of a task. + +In the case of ``rescheduled``, the task will instead immediately transition to +``released`` and then ``forgotten`` without waiting for the scheduler. + +.. image:: images/worker-forget-state.svg + :alt: Worker states for computing tasks + + +Irregular flow +-------------- +There are a few important exceptions to the flow diagrams above: + +- A task is `stolen `_, in which case it transitions from ``waiting``, + ``ready``, or ``constrained`` directly to ``released``. Note that steal requests for + tasks that are currently executing are rejected. +- Scheduler intercession, in which the scheduler reassigns a task that was + previously assigned to a separate worker to a new worker. This most commonly + occurs when a :doc:`worker dies ` during computation. +- Client intercession, where a client either explicitly releases a Future or descopes + it; alternatively the whole client may shut down or become unresponsive. When there + are no more clients holding references to a key or one of its dependents, the + Scheduler will release it. + +In short: + +.. important:: + A task can transition to ``released`` from *any* state, not just those in the + diagrams above. + +If there are no dependants, the task immediately transitions to ``forgotten`` and is +descoped. However, there is an important exception, :ref:`cancelled-tasks`. + + +.. _cancelled-tasks: + +Task cancellation +----------------- +The Worker may receive a request to release a key while it is currently in ``flight``, +``executing``, or ``long-running``. Due to technical limitations around cancelling +Python threads, and the way data fetching from peer workers is currently implemented, +such an event cannot cause the related asyncio task (and, in the case of ``executing`` / +``long-running``, the thread running the user code) to be immediately aborted. Instead, +tasks in these three states are instead transitioned to another state, ``cancelled``, +which means that the asyncio task will proceed to completion (outcome is irrelevant) and +then* the Dask task will be released. + +The ``cancelled`` state has a substate, :attr:`~TaskState.previous`, which is set to one +of the above three states. The common notation for this ``()``, +e.g. ``cancelled(flight)``. + +While a task is cancelled, one of three things will happen: + +- Nothing happens before the asyncio task completes; e.g. the Scheduler does not change + its mind and still wants the Worker to forget about the task until the very end. + When that happens, the task transitions from ``cancelled`` to ``released`` and, + typically, ``forgotten``. +- The scheduler switches back to its original request: + + - The scheduler asks the Worker to fetch a task that is currently + ``cancelled(flight)``; at which point the task will immediately revert to + ``flight``, forget that cancellation ever happened, and continue waiting on the data + fetch that's already running; + - The scheduler asks the Worker to compute a task that is currently + ``cancelled(executing)`` or ``cancelled(long-running)``. The Worker will completely + disregard the new :attr:`~TaskState.run_spec` (if it changed), switch back to the + :attr:`~TaskState.previous` state, and wait for the already executing thread to + finish. + +- The scheduler flips to the opposite request, from fetch to computation or the other + way around. + +To serve this last use case there is another special state, ``resumed``. A task can +enter ``resumed`` state exclusively from ``cancelled``. ``resumed`` retains the +:attr:`~TaskState.previous` attribute from the ``cancelled`` state and adds another +attribute, :attr:`~TaskState.next`, which is always: + +- ``fetch``, if :attr:`~TaskState.previous` is ``executing`` or ``long-running`` +- ``waiting``, if :attr:`~TaskState.previous` is ``flight`` + +To recap, these are all possible permutations of states and substates to handle +cancelled tasks: + +========= ============ ======= +state previous next +========= ============ ======= +cancelled flight None +cancelled executing None +cancelled long-running None +resumed flight waiting +resumed executing fetch +resumed long-running fetch +========= ============ ======= + +If a ``resumed`` task completes successfully, it will transition to ``memory`` (as +opposed to a ``cancelled`` task, where the output is disregarded) and the Scheduler +will be informed with a spoofed termination message, that is the expected end message +for ``flight`` if the task is ``resumed(executing->fetch)`` or +``resumed(long-running->fetch)``, and the expected end message for ``execute`` if +the task is ``resumed(flight->waiting)``. + +If the task fails or raises :class:`~distributed.Reschedule`, the Worker will instead +silently ignore the exception and switch to its intended course, so +``resumed(executing->fetch)`` or ``resumed(long-running->fetch)`` will transition to +``fetch`` and ``resumed(flight->waiting)`` will transition to ``waiting``. + +Finally, the scheduler can change its mind multiple times over the lifetime of the task, +so a ``resumed(executing->fetch)`` or ``resumed(long-running->fetch)`` task may be +requested to transition to ``waiting`` again, at which point it will just revert to its +:attr:`~TaskState.previous` state and forget the whole incident; likewise a +``resumed(flight->waiting)`` task could be requested to transition to ``fetch`` again, +so it will just transition to ``flight`` instead. + +.. image:: images/worker-cancel-state1.svg + :alt: Worker states for cancel/resume + +.. image:: images/worker-cancel-state2.svg + :alt: Worker states for cancel/resume + + +**A common real-life use case** + +1. There are at least two workers on the cluster, A and B. +2. Task x is computed successfully on worker A. +3. When task x transitions to memory on worker A, the scheduler asks worker B to compute + task y, which depends on task x. +4. B starts acquiring the key x from A, which sends the task into ``flight`` mode. +5. Worker A crashes, and for whatever reason the scheduler notices before worker B does. +6. The scheduler will release task y (because it's waiting on dependencies that are + nowhere to be found in memory anymore) and reschedule task x somewhere else on the + cluster. Task x will transition to ``cancelled(flight)`` on worker A. +7. If the scheduler randomly chooses worker A to compute task X, the task will + transition to ``resumed(flight->waiting)``. +8. When, *and only when*, the TCP socket from A to B collapses (e.g. due to timeout), + the task will transition to ``waiting`` and will be eventually recomputed on A. + +.. important:: + + You always have *at most* one :meth:`~WorkerBase.compute` or + :meth:`~WorkerBase.gather_dep` asyncio task running for any one given key; you + never have both. + + +Task state mapping between Scheduler and Worker +----------------------------------------------- + +The task states on the scheduler and the worker are different, and their mapping is +somewhat nuanced: + ++------------------+-----------------------+-------------------------+ +| Scheduler states | Typical worker states | Edge case worker states | ++==================+=======================+=========================+ +| - released | - (unknown) | - released | +| - waiting | | - cancelled | +| - no-worker | | | ++------------------+-----------------------+-------------------------+ +| - processing | - waiting | - resumed(waiting) | +| | - ready | | +| | - constrained | | +| | - executing | | +| | - long-running | | ++------------------+-----------------------+-------------------------+ +| - memory | - memory | - error | +| | - fetch | - missing | +| | - flight | - resumed(fetch) | ++------------------+-----------------------+-------------------------+ +| - erred | - error | | ++------------------+-----------------------+-------------------------+ + +In addition to the above states, a worker may not know about a specific task at all. +The opposite, where the worker knows about a task but it is nowhere to be found on the +scheduler, happens exclusively in the case of :ref:`cancelled-tasks`. + +There are also *race conditions* to be considered, where a worker (or some workers) know +something before the scheduler does, or the other way around. For example, + +- A task will always transition from ``executing`` to ``memory`` on the worker before + it can transition from ``processing`` to ``memory`` on the scheduler +- A task will always transition to ``released`` or ``forgotten`` on the scheduler first, + and only when the message reaches the worker it will be released there too. + + +Flow control +------------ +.. image:: images/worker-state-machine.svg + :alt: Worker state machine control flow + +There are several classes involved in the worker state machine: + +:class:`TaskState` includes all the information related to a single task; it also +includes references to dependent and dependency tasks. This is just a data holder, with +no mutating methods. Note that this is a distinct class from +:class:`distributed.scheduler.TaskState`. + +:class:`WorkerState` encapsulates the state of the worker as a whole. It holds +references to :class:`TaskState` in its :attr:`~WorkerState.tasks` dictionary and in +several other secondary collections. Crucially, this class has no knowledge or +visibility whatsoever on asyncio, networking, disk I/O, threads, etc. +Note that this is a distinct class from :class:`distributed.scheduler.WorkerState`. + +:class:`WorkerState` offers a single method to mutate the state: +:meth:`~WorkerState.handle_stimulus`. The state must not be altered in any other way. +The method acquires a :class:`StateMachineEvent`, a.k.a. *stimulus*, which is a data +class which determines that something happened which may cause the worker state to +mutate. A stimulus can arrive from either the scheduler (e.g. a request to compute a +task) or from the worker itself (e.g. a task has finished computing). + +:meth:`WorkerState.handle_stimulus` alters the internal state (e.g., it could transition +a task from ``executing`` to ``memory``) and returns a list of :class:`Instruction` +objects, which are actions that the worker needs to take but are external to the state +itself: + +- send a message to the scheduler +- compute a task +- gather a task from a peer worker + +:meth:`WorkerState.handle_stimulus` is wrapped by :meth:`BaseWorker.handle_stimulus`, +which consumes the :class:`Instruction` objects. :class:`BaseWorker` deals with asyncio +task creation, tracking, and cleanup, but does not actually implement the actual task +execution or gather; instead it exposes abstract async methods +:meth:`~BaseWorker.execute` and :meth:`~BaseWorker.gather_dep`, which are then +overridden by its subclass :class:`~distributed.Worker`, which actually runs tasks and +performs network I/O. When the implemented methods finish, they must return a +:class:`StateMachineEvent`, which is fed back into :meth:`BaseWorker.handle_stimulus`. + +.. note:: + This can create a (potentially very long) chain of events internal to the worker; + e.g. if there are more tasks in the :attr:`~WorkerState.ready` queue than there are + threads, then the termination :class:`StateMachineEvent` of one task will trigger the + :class:`Instruction` to execute the next one. + +To summarize: + +- :class:`WorkerState` is agnostic to asyncio, networking, threading, and disk I/O; it + includes collections of :class:`TaskState` objects. +- :class:`BaseWorker` encapsulates :class:`WorkerState` and adds awareness of asyncio +- :class:`~distributed.Worker` subclasses :class:`BaseWorker` and adds awereness of + networking, threading, and disk I/O. + + +Internal state permutation +-------------------------- +Internally, :meth:`WorkerState.handle_stimulus` works very similarly to +:ref:`the same process on the scheduler side `: + +#. :meth:`WorkerState.handle_stimulus` calls ``WorkerState._handle_()``, +#. which returns a tuple of + + - *recommendations* to transition tasks: {:class:`TaskState`: } + - list of :class:`Instruction` objects + +#. :meth:`WorkerState.handle_stimulus` then passes the recommendations to + :meth:`WorkerState._transitions` +#. For each recommendation, :meth:`WorkerState._transitions` calls + :meth:`WorkerState._transition`, +#. which in turn calls ``WorkerState._transition__()``, +#. which in turn returns an additional tuple of (recommendations, instructions) +#. the new recommendations are consumed by :meth:`WorkerState._transitions`, until no + more recommendations are returned. +#. :meth:`WorkerState.handle_stimulus` finally returns the list of instructions, which + has been progressively extended by the transitions. + + +API Documentation +----------------- + +.. autoclass:: TaskState + :members: + +.. autoclass:: WorkerState + :members: + +.. autoclass:: BaseWorker + :members: + +.. autoclass:: StateMachineEvent + :members: + +.. autoclass:: Instruction + :members: + +.. note:: + :class:`StateMachineEvent` and :class:`Instruction` are abstract classes, with many + subclasses which are not listed here for the sake of brevity. + Refer to the implementation module :mod:`distributed.worker_state_machine` for the + full list. diff --git a/docs/source/worker.rst b/docs/source/worker.rst index b3d9f8c0cb..9042e24191 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -93,58 +93,13 @@ more details on the command line options, please have a look at the Internal Scheduling ------------------- +See dedicated page: :doc:`worker-state` -Internally tasks that come to the scheduler proceed through the following pipeline as -:class:`distributed.worker_state_machine.TaskState` objects. Tasks which follow this -path have a :attr:`~distributed.worker_state_machine.TaskState.runspec` defined which -instructs the worker how to execute them. -.. image:: images/worker-task-state.svg - :alt: Dask worker task states - -Data dependencies are also represented as -:class:`~distributed.worker_state_machine.TaskState` objects and follow a simpler path -through the execution pipeline. These tasks do not have a -:attr:`~distributed.worker_state_machine.TaskState.runspec` defined and instead contain -a listing of workers to collect their result from. - - -.. image:: images/worker-dep-state.svg - :alt: Dask worker dependency states - -As tasks arrive they are prioritized and put into a heap. They are then taken -from this heap in turn to have any remote dependencies collected. For each -dependency we select a worker at random that has that data and collect the -dependency from that worker. To improve bandwidth we opportunistically gather -other dependencies of other tasks that are known to be on that worker, up to a -maximum of 200MB of data (too little data and bandwidth suffers, too much data -and responsiveness suffers). We use a fixed number of connections (around -10-50) so as to avoid overly-fragmenting our network bandwidth. In the event -that the network comms between two workers are saturated, a dependency task may -cycle between ``fetch`` and ``flight`` until it is successfully collected. - -After all dependencies for a task are in memory we transition the task to the -ready state and put the task again into a heap of tasks that are ready to run. - -We collect from this heap and put the task into a thread from a local thread -pool to execute. - -Optionally, this task may identify itself as a long-running task (see -:doc:`Tasks launching tasks `), at which point it secedes from the -thread pool. - -A task either errs or its result is put into memory. In either case a response -is sent back to the scheduler. - -Tasks slated for execution and tasks marked for collection from other workers -must follow their respective transition paths as defined above. The only -exceptions to this are when: - -* A task is `stolen `_, in which case a task which might have - been collected will instead be executed on the thieving worker -* Scheduler intercession, in which the scheduler reassigns a task that was - previously assigned to a separate worker to a new worker. This most commonly - occurs when a `worker dies `_ during computation. +API Documentation +----------------- +.. autoclass:: distributed.worker.Worker + :members: .. _nanny: @@ -157,23 +112,3 @@ process. .. autoclass:: distributed.nanny.Nanny :members: - - -API Documentation ------------------ - -.. currentmodule:: distributed.worker_state_machine - -.. autoclass:: distributed.worker_state_machine.TaskState - :members: - -.. autoclass:: distributed.worker_state_machine.WorkerState - :members: - -.. autoclass:: distributed.worker_state_machine.BaseWorker - :members: - -.. currentmodule:: distributed.worker - -.. autoclass:: distributed.worker.Worker - :members: