Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite test_reconnect to use subproc to kill scheduler reliably #6967

Merged
merged 2 commits into from
Aug 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 41 additions & 53 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import zipfile
from collections import deque
from collections.abc import Generator
from contextlib import contextmanager, nullcontext
from contextlib import ExitStack, contextmanager, nullcontext
from functools import partial
from operator import add
from threading import Semaphore
Expand Down Expand Up @@ -71,7 +71,7 @@
from distributed.cluster_dump import load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Server, Status
from distributed.core import Status
from distributed.metrics import time
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
from distributed.sizeof import sizeof
Expand Down Expand Up @@ -3592,65 +3592,53 @@ async def test_scatter_raises_if_no_workers(c, s):
async def test_reconnect():
port = open_port()

async def hard_stop(s):
for pc in s.periodic_callbacks.values():
pc.stop()
stack = ExitStack()
proc = popen(["dask-scheduler", "--no-dashboard", f"--port={port}"])
stack.enter_context(proc)
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c, Worker(
f"127.0.0.1:{port}"
) as w:
await c.wait_for_workers(1, timeout=10)
x = c.submit(inc, 1)
assert (await x) == 2
stack.close()
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

s.stop_services()
for comm in list(s.stream_comms.values()):
comm.abort()
for comm in list(s.client_comms.values()):
comm.abort()
start = time()
while c.status != "connecting":
assert time() < start + 10
await asyncio.sleep(0.01)

await s.rpc.close()
s.stop()
await Server.close(s)
assert x.status == "cancelled"
with pytest.raises(CancelledError):
await x

async with Scheduler(port=port) as s:
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c:
async with Worker(f"127.0.0.1:{port}") as w:
await c.wait_for_workers(1, timeout=10)
x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s)
with popen(["dask-scheduler", "--no-dashboard", f"--port={port}"]):
start = time()
while c.status != "running":
await asyncio.sleep(0.1)
assert time() < start + 10

await w.finished()
async with Worker(f"127.0.0.1:{port}"):
start = time()
while c.status != "connecting":
while len(await c.nthreads()) != 1:
await asyncio.sleep(0.05)
assert time() < start + 10
await asyncio.sleep(0.01)

assert x.status == "cancelled"
with pytest.raises(CancelledError):
await x

async with Scheduler(port=port) as s2:
start = time()
while c.status != "running":
await asyncio.sleep(0.1)
assert time() < start + 10

await w.finished()
async with Worker(f"127.0.0.1:{port}"):
start = time()
while len(await c.nthreads()) != 1:
await asyncio.sleep(0.05)
assert time() < start + 10

x = c.submit(inc, 1)
assert (await x) == 2
await hard_stop(s2)
x = c.submit(inc, 1)
assert (await x) == 2

start = time()
while True:
assert time() < start + 10
try:
await x
assert False
except CommClosedError:
continue
except CancelledError:
break
await c._close(fast=True)
start = time()
while True:
assert time() < start + 10
try:
await x
assert False
except CommClosedError:
continue
except CancelledError:
break
await c._close(fast=True)


class UnhandledExceptions(Exception):
Expand Down