Skip to content

Commit

Permalink
No longer hold dependencies of erred tasks in memory #4918
Browse files Browse the repository at this point in the history
This is a follow up to #4784 and reduces complexity of Worker.release_key significantly.

There is one non-trivial behavioural change regarding erred tasks. Current main branch holds on to dependencies of an erred task on a worker and implements a release mechanism once that erred task is released. I implemented this recently trying to capture status quo but I'm not convinced any longer that this is the correct behaviour. It treats the erred case specially which introduces a lot of complexity. The only place where this might be of interest is if an erred task wants to be recomputed locally. Not forgetting the data keys until the erred task was released would speed up this process. However, we'd still need to potentially compute some keys and I'm inclined to strike this feature in favour of reduced complexity.
  • Loading branch information
fjetter authored Jun 29, 2021
1 parent b9d2e3b commit dbb13ec
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 76 deletions.
51 changes: 30 additions & 21 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
mul,
nodebug,
slowinc,
slowsum,
)
from distributed.worker import Worker, error_message, logger, parse_memory_limit

Expand Down Expand Up @@ -2087,11 +2088,6 @@ def raise_exc(*args):
await asyncio.sleep(0.01)

expected_states = {
# We currently don't have a good way to actually release this memory as
# long as the tasks still have a dependent. We'll need to live with this
# memory for now
f.key: "memory",
g.key: "memory",
res.key: "error",
}

Expand Down Expand Up @@ -2159,14 +2155,14 @@ def raise_exc(*args):

expected_states = {
f.key: "memory",
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)

f.release()
g.release()

# This is not happening
for server in [s, a, b]:
while server.tasks:
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -2220,13 +2216,14 @@ def raise_exc(*args):
res.release()
# We no longer hold any refs to f or g and B didn't have any erros. It
# releases everything as expected
while a.tasks:
while len(a.tasks) > 1:
await asyncio.sleep(0.01)

expected_states = {
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)
assert_task_states_on_worker(expected_states, b)

g.release()
Expand Down Expand Up @@ -2283,7 +2280,6 @@ def raise_exc(*args):
assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "memory",
g.key: "memory",
h.key: "memory",
res.key: "error",
Expand All @@ -2301,15 +2297,6 @@ def raise_exc(*args):

# B must not forget a task since all have a still valid dependent
expected_states_B = {
f.key: "memory",
# We actually cannot hold on to G even though the graph would suggest
# otherwise. This is because H was only introduced as a dependency and
# the scheduler never told the worker how H fits into the big picture.
# Therefore, it thinks that G does not have any dependents anymore and
# releases it. Too bad. Once we have speculative task assignments this
# should be more exact since we should always tell the worker what's
# going on
# g.key: released,
h.key: "memory",
res.key: "error",
}
Expand All @@ -2320,10 +2307,6 @@ def raise_exc(*args):
expected_states_A = {}
assert_task_states_on_worker(expected_states_A, a)
expected_states_B = {
f.key: "memory",
# See above
# g.key: released,
h.key: "memory",
res.key: "error",
}

Expand All @@ -2334,3 +2317,29 @@ def raise_exc(*args):
for server in [s, a, b]:
while server.tasks:
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("127.0.0.1", x) for x in range(4)], timeout=None)
async def test_hold_on_to_replicas(c, s, *workers):
f1 = c.submit(inc, 1, workers=[workers[0].address], key="f1")
f2 = c.submit(inc, 2, workers=[workers[1].address], key="f2")

sum_1 = c.submit(
slowsum, [f1, f2], delay=0.1, workers=[workers[2].address], key="sum"
)
sum_2 = c.submit(
slowsum, [f1, sum_1], delay=0.2, workers=[workers[3].address], key="sum_2"
)
f1.release()
f2.release()

while sum_2.key not in workers[3].tasks:
await asyncio.sleep(0.01)

while not workers[3].tasks[sum_2.key].state == "memory":
assert len(s.tasks[f1.key].who_has) >= 2
assert s.tasks[f2.key].state == "released"
await asyncio.sleep(0.01)

while len(workers[2].tasks) > 1:
await asyncio.sleep(0.01)
66 changes: 11 additions & 55 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,7 @@ def add_task(
ts.dependencies.add(dep_ts)
dep_ts.dependents.add(ts)

if dep_ts.state in ("fetch", "flight"):
# if we _need_ to grab data or are in the process
if dep_ts.state not in ("memory",):
ts.waiting_for_data.add(dep_ts.key)

self.update_who_has(who_has=who_has)
Expand Down Expand Up @@ -1762,9 +1761,6 @@ def transition_fetch_waiting(self, ts, runspec):
# clear `who_has` of stale info
ts.who_has.clear()

# remove entry from dependents to avoid a spurious `gather_dep` call``
for dependent in ts.dependents:
dependent.waiting_for_data.discard(ts.key)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down Expand Up @@ -1794,9 +1790,6 @@ def transition_flight_waiting(self, ts, runspec):
# clear `who_has` of stale info
ts.who_has.clear()

# remove entry from dependents to avoid a spurious `gather_dep` call``
for dependent in ts.dependents:
dependent.waiting_for_data.discard(ts.key)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down Expand Up @@ -1991,6 +1984,8 @@ def transition_executing_done(self, ts, value=no_value, report=True):
ts.traceback = msg["traceback"]
ts.state = "error"
out = "error"
for d in ts.dependents:
d.waiting_for_data.add(ts.key)

# Don't release the dependency keys, but do remove them from `dependents`
for dependency in ts.dependencies:
Expand Down Expand Up @@ -2621,12 +2616,12 @@ def release_key(

if self.validate:
assert isinstance(key, str)
ts = self.tasks.get(key, TaskState(key=key))
ts = self.tasks.get(key, None)
# If the scheduler holds a reference which is usually the
# case when it instructed the task to be computed here or if
# data was scattered we must not release it unless the
# scheduler allow us to. See also handle_delete_data and
if ts and ts.scheduler_holds_ref:
if ts is None or ts.scheduler_holds_ref:
return
logger.debug(
"Release key %s",
Expand All @@ -2640,28 +2635,14 @@ def release_key(
self.log.append((key, "release-key", {"cause": cause}, reason))
else:
self.log.append((key, "release-key", reason))
if key in self.data and not ts.dependents:
if key in self.data:
try:
del self.data[key]
except FileNotFoundError:
logger.error("Tried to delete %s but no file found", exc_info=True)
if key in self.actors and not ts.dependents:
if key in self.actors:
del self.actors[key]

# for any dependencies of key we are releasing remove task as dependent
for dependency in ts.dependencies:
dependency.dependents.discard(ts)

if not dependency.dependents and dependency.state not in (
# don't boot keys that are in flight
# we don't know if they're already queued up for transit
# in a gather_dep callback
"flight",
# The same is true for already executing keys.
"executing",
):
self.release_key(dependency.key, reason=f"Dependent {ts} released")

for worker in ts.who_has:
self.has_what[worker].discard(ts.key)
ts.who_has.clear()
Expand All @@ -2681,8 +2662,10 @@ def release_key(
# Inform the scheduler of keys which will have gone missing
# We are releasing them before they have completed
if ts.state in PROCESSING:
# This path is only hit with work stealing
msg = {"op": "release", "key": key, "cause": cause}
else:
# This path is only hit when calling release_key manually
msg = {
"op": "release-worker-data",
"keys": [key],
Expand All @@ -2691,9 +2674,8 @@ def release_key(
self.batched_stream.send(msg)

self._notify_plugins("release_key", key, ts.state, cause, reason, report)
if key in self.tasks and not ts.dependents:
self.tasks.pop(key)
del ts
del self.tasks[key]

except CommClosedError:
pass
except Exception as e:
Expand All @@ -2704,32 +2686,6 @@ def release_key(
pdb.set_trace()
raise

def rescind_key(self, key):
try:
if self.tasks[key].state not in PENDING:
return

ts = self.tasks.pop(key)

# Task has been rescinded
# For every task that it required
for dependency in ts.dependencies:
# Remove it as a dependent
dependency.dependents.remove(key)
# If the dependent is now without purpose (no dependencies), remove it
if not dependency.dependents:
self.release_key(
dependency.key, reason="All dependent keys rescinded"
)

except Exception as e:
logger.exception(e)
if LOG_PDB:
import pdb

pdb.set_trace()
raise

################
# Execute Task #
################
Expand Down

0 comments on commit dbb13ec

Please sign in to comment.