diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 755f5f97ef..55fb653e1b 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -228,6 +228,12 @@ async def test_close_connections(c, s, *workers): x = x.rechunk((1000, 1)) future = c.compute(x.sum()) + + # need to wait to ensure the scheduler actually registered anything + # processing. Computation itself should take a few seconds so this should be + # a safe period to wait. + await asyncio.sleep(0.05) + while any(s.processing.values()): await asyncio.sleep(0.5) worker = random.choice(list(workers)) diff --git a/distributed/worker.py b/distributed/worker.py index fc38d435a5..f8d95cc2be 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -454,6 +454,8 @@ def __init__( ("ready", "executing"): self.transition_ready_executing, ("ready", "memory"): self.transition_ready_memory, ("ready", "error"): self.transition_ready_error, + ("ready", "waiting"): self.transition_ready_waiting, + ("constrained", "waiting"): self.transition_ready_waiting, ("constrained", "executing"): self.transition_constrained_executing, ("executing", "memory"): self.transition_executing_done, ("executing", "error"): self.transition_executing_done, @@ -1720,8 +1722,26 @@ def transition_ready_error(self, ts): self.send_task_state_to_scheduler(ts) def transition_ready_memory(self, ts, value=None): + if value: + self.put_key_in_memory(ts, value=value) self.send_task_state_to_scheduler(ts) + def transition_ready_waiting(self, ts): + """ + This transition is common for work stealing + """ + ts.state = "waiting" + ts.runspec = None + # FIXME GH4413 Replace with recommended transition to forgotten + # If there are no dependents anymore, this has been a mere dependency + # and it was not intended to be executed on this worker. In this case we + # need to release the key ourselves since the scheduler will no longer + # propagate the deletion to this worker + if not ts.dependents: + self.release_key(ts.key) + return + return ts.state + def transition_constrained_executing(self, ts): self.transition_ready_executing(ts) for resource, quantity in ts.resource_restrictions.items(): @@ -2259,8 +2279,9 @@ def steal_request(self, key): # There may be a race condition between stealing and releasing a task. # In this case the self.tasks is already cleared. The `None` will be # registered as `already-computing` on the other end - if key in self.tasks: - state = self.tasks[key].state + ts = self.tasks.get(key) + if ts: + state = ts.state else: state = None @@ -2268,7 +2289,11 @@ def steal_request(self, key): self.batched_stream.send(response) if state in ("ready", "waiting", "constrained"): - self.release_key(key, reason="steal_request") + # FIXME GH4413 We no longer want to execute this task but there is + # no state to identify this. We still need to track it, + # otherwise this worker may never fetch it in case it is still + # required + self.transition(ts, "waiting") def release_key(self, key, reason=None, cause=None, report=True): try: @@ -2582,7 +2607,17 @@ async def execute(self, key, report=False): executor_error = e raise - if ts.state not in ("executing", "long-running"): + # We'll need to check again for the task state since it may have + # changed since the execution was kicked off. In particular, it may + # have been canceled and released already in which case we'll have + # to drop the result immediately + key = ts.key + ts = self.tasks.get(key) + + if ts is None: + logger.debug( + "Dropping result for %s since task has already been released." % key + ) return result["key"] = ts.key @@ -2877,7 +2912,7 @@ def _notify_plugins(self, method_name, *args, **kwargs): def validate_task_memory(self, ts): assert ts.key in self.data or ts.key in self.actors - assert ts.nbytes is not None + assert isinstance(ts.nbytes, int) assert not ts.waiting_for_data assert ts.key not in self.ready assert ts.state == "memory" @@ -2904,7 +2939,7 @@ def validate_task_ready(self, ts): def validate_task_waiting(self, ts): assert ts.key not in self.data assert ts.state == "waiting" - if ts.dependencies: + if ts.dependencies and ts.runspec: assert not all(dep.key in self.data for dep in ts.dependencies) def validate_task_flight(self, ts): @@ -2940,6 +2975,9 @@ def validate_state(self): try: assert len(self.data_needed) == len(set(self.data_needed)) waiting_keys = set() + + assert self.executing_count >= 0 + for ts in self.tasks.values(): assert ts.state is not None # check that worker has task @@ -2958,10 +2996,6 @@ def validate_state(self): ts_wait = self.tasks[key] assert ts_wait.state in PROCESSING, ts_wait waiting_keys.add(key) - if ts.state == "memory": - assert isinstance(ts.nbytes, int) - assert not ts.waiting_for_data - assert ts.key in self.data or ts.key in self.actors # FIXME: Tracking of waiting_for_data_count is broken. Most likely # since tasks may be required/waited on by multiple tasks but the