Skip to content

Commit

Permalink
Transition task when victimized by stealing
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jan 12, 2021
1 parent 99ad636 commit 0c59e63
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 10 deletions.
6 changes: 6 additions & 0 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ async def test_close_connections(c, s, *workers):
x = x.rechunk((1000, 1))

future = c.compute(x.sum())

# need to wait to ensure the scheduler actually registered anything
# processing. Computation itself should take a few seconds so this should be
# a safe period to wait.
await asyncio.sleep(0.05)

while any(s.processing.values()):
await asyncio.sleep(0.5)
worker = random.choice(list(workers))
Expand Down
54 changes: 44 additions & 10 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ def __init__(
("ready", "executing"): self.transition_ready_executing,
("ready", "memory"): self.transition_ready_memory,
("ready", "error"): self.transition_ready_error,
("ready", "waiting"): self.transition_ready_waiting,
("constrained", "waiting"): self.transition_ready_waiting,
("constrained", "executing"): self.transition_constrained_executing,
("executing", "memory"): self.transition_executing_done,
("executing", "error"): self.transition_executing_done,
Expand Down Expand Up @@ -1720,8 +1722,26 @@ def transition_ready_error(self, ts):
self.send_task_state_to_scheduler(ts)

def transition_ready_memory(self, ts, value=None):
if value:
self.put_key_in_memory(ts, value=value)
self.send_task_state_to_scheduler(ts)

def transition_ready_waiting(self, ts):
"""
This transition is common for work stealing
"""
ts.state = "waiting"
ts.runspec = None
# FIXME GH4413 Replace with recommended transition to forgotten
# If there are no dependents anymore, this has been a mere dependency
# and it was not intended to be executed on this worker. In this case we
# need to release the key ourselves since the scheduler will no longer
# propagate the deletion to this worker
if not ts.dependents:
self.release_key(ts.key)
return
return ts.state

def transition_constrained_executing(self, ts):
self.transition_ready_executing(ts)
for resource, quantity in ts.resource_restrictions.items():
Expand Down Expand Up @@ -2259,16 +2279,21 @@ def steal_request(self, key):
# There may be a race condition between stealing and releasing a task.
# In this case the self.tasks is already cleared. The `None` will be
# registered as `already-computing` on the other end
if key in self.tasks:
state = self.tasks[key].state
ts = self.tasks.get(key)
if ts:
state = ts.state
else:
state = None

response = {"op": "steal-response", "key": key, "state": state}
self.batched_stream.send(response)

if state in ("ready", "waiting", "constrained"):
self.release_key(key, reason="steal_request")
# FIXME GH4413 We no longer want to execute this task but there is
# no state to identify this. We still need to track it,
# otherwise this worker may never fetch it in case it is still
# required
self.transition(ts, "waiting")

def release_key(self, key, reason=None, cause=None, report=True):
try:
Expand Down Expand Up @@ -2582,7 +2607,17 @@ async def execute(self, key, report=False):
executor_error = e
raise

if ts.state not in ("executing", "long-running"):
# We'll need to check again for the task state since it may have
# changed since the execution was kicked off. In particular, it may
# have been canceled and released already in which case we'll have
# to drop the result immediately
key = ts.key
ts = self.tasks.get(key)

if ts is None:
logger.debug(
"Dropping result for %s since task has already been released." % key
)
return

result["key"] = ts.key
Expand Down Expand Up @@ -2877,7 +2912,7 @@ def _notify_plugins(self, method_name, *args, **kwargs):

def validate_task_memory(self, ts):
assert ts.key in self.data or ts.key in self.actors
assert ts.nbytes is not None
assert isinstance(ts.nbytes, int)
assert not ts.waiting_for_data
assert ts.key not in self.ready
assert ts.state == "memory"
Expand All @@ -2904,7 +2939,7 @@ def validate_task_ready(self, ts):
def validate_task_waiting(self, ts):
assert ts.key not in self.data
assert ts.state == "waiting"
if ts.dependencies:
if ts.dependencies and ts.runspec:
assert not all(dep.key in self.data for dep in ts.dependencies)

def validate_task_flight(self, ts):
Expand Down Expand Up @@ -2940,6 +2975,9 @@ def validate_state(self):
try:
assert len(self.data_needed) == len(set(self.data_needed))
waiting_keys = set()

assert self.executing_count >= 0

for ts in self.tasks.values():
assert ts.state is not None
# check that worker has task
Expand All @@ -2958,10 +2996,6 @@ def validate_state(self):
ts_wait = self.tasks[key]
assert ts_wait.state in PROCESSING, ts_wait
waiting_keys.add(key)
if ts.state == "memory":
assert isinstance(ts.nbytes, int)
assert not ts.waiting_for_data
assert ts.key in self.data or ts.key in self.actors

# FIXME: Tracking of waiting_for_data_count is broken. Most likely
# since tasks may be required/waited on by multiple tasks but the
Expand Down

0 comments on commit 0c59e63

Please sign in to comment.