Skip to content

Commit

Permalink
Test retire workers deadlock (#6240)
Browse files Browse the repository at this point in the history
Co-authored-by: crusaderky <crusaderky@gmail.com>
  • Loading branch information
gjoseph92 and crusaderky authored Jun 16, 2022
1 parent 52b0b26 commit 33c5cb2
Showing 1 changed file with 104 additions and 2 deletions.
106 changes: 104 additions & 2 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@

import pytest

from distributed import Nanny, wait
from distributed import Event, Nanny, Scheduler, Worker, wait
from distributed.active_memory_manager import (
ActiveMemoryManagerExtension,
ActiveMemoryManagerPolicy,
RetireWorker,
)
from distributed.core import Status
from distributed.utils_test import captured_logger, gen_cluster, inc, slowinc
from distributed.utils_test import (
assert_story,
captured_logger,
gen_cluster,
inc,
slowinc,
)

NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False}

Expand Down Expand Up @@ -903,6 +910,101 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b):
assert await c.submit(inc, 1) == 2


@gen_cluster(
client=True,
config={
"distributed.scheduler.active-memory-manager.start": True, # to avoid one-off AMM instance
"distributed.scheduler.active-memory-manager.policies": [],
},
timeout=15,
)
async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(
c, s: Scheduler, a: Worker, b: Worker
):
"""
If all keys have been moved off a worker, but then new keys arrive (due to task completion or `gather_dep`)
before the worker has actually closed, make sure we still retire it (instead of hanging forever).
This test is timing-sensitive. If it runs too slowly, it *should* `pytest.skip` itself.
See https://github.com/dask/distributed/issues/6223 for motivation.
"""
ws_a = s.workers[a.address]
ws_b = s.workers[b.address]
event = Event()

# Put 200 keys on the worker, so `_track_retire_worker` will sleep for 0.5s
xs = c.map(lambda x: x, range(200), workers=[a.address])
await wait(xs)

# Put an extra task on the worker, which we will allow to complete once the `xs`
# have been replicated.
extra = c.submit(
lambda: event.wait("2s"),
workers=[a.address],
allow_other_workers=True,
key="extra",
)

while (
extra.key not in a.state.tasks or a.state.tasks[extra.key].state != "executing"
):
await asyncio.sleep(0.01)

t = asyncio.create_task(c.retire_workers([a.address]))

# Wait for all `xs` to be replicated.
while not len(ws_b.has_what) == len(xs):
await asyncio.sleep(0)

# `_track_retire_worker` _should_ now be sleeping for 0.5s, because there were >=200 keys on A.
# In this test, everything from the beginning of the transfers needs to happen within 0.5s.

# Simulate the policy running again. Because the default 2s AMM interval is longer
# than the 0.5s wait, what we're about to trigger is unlikely, but still possible
# for the times to line up. (Especially with a custom AMM interval.)
amm: ActiveMemoryManagerExtension = s.extensions["amm"]
assert len(amm.policies) == 1
policy = next(iter(amm.policies))
assert isinstance(policy, RetireWorker)

amm.run_once()

# The policy has removed itself, because all `xs` have been replicated.
assert not amm.policies
assert policy.done(), {ts.key: ts.who_has for ts in ws_a.has_what}

# But what if a new key arrives now while `_track_retire_worker` is still (maybe)
# sleeping? Let `extra` complete and wait for it to hit the scheduler.
await event.set()
await wait(extra)

if a.address not in s.workers:
# It took more than 0.5s to get here, and the scheduler closed our worker. Dang.
pytest.skip(
"Timing didn't work out: `_track_retire_worker` finished before `extra` completed."
)

# `retire_workers` doesn't hang
await t
assert a.address not in s.workers
assert not amm.policies

# `extra` was not transferred from `a` to `b`. Instead, it was recomputed on `b`.
story = b.state.story(extra.key)
assert_story(
story,
[
(extra.key, "compute-task", "released"),
(extra.key, "released", "waiting", "waiting", {"extra": "ready"}),
(extra.key, "waiting", "ready", "ready", {"extra": "executing"}),
],
)

# `extra` completes successfully and is fetched from the other worker.
await extra.result()


# FIXME can't drop runtime of this test below 10s; see distributed#5585
@pytest.mark.slow
@gen_cluster(
Expand Down

0 comments on commit 33c5cb2

Please sign in to comment.