Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stimulus id contextvars explicit handlers #6083

Closed
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c2a3ead
test_scheduler.py succeeds
sjperkins Apr 1, 2022
5d10407
Working test_worker.py and test_client.py
sjperkins Apr 1, 2022
e30be1b
Support transition_log in http output
sjperkins Apr 1, 2022
308568e
Rename assert_worker_story assert_story
sjperkins Apr 1, 2022
b017a12
If possible, defer to STIMULUS_ID when sending messages
sjperkins Apr 1, 2022
dd6efcb
Support passing stimulus_id in Scheduler handlers
sjperkins Apr 1, 2022
40f87f7
Transmit stimulus_id's from client
sjperkins Apr 1, 2022
52697de
Generate new stimulus_id on completion/failure of Worker.execute
sjperkins Apr 1, 2022
17f069a
Use decorator to manage stimulus injection
sjperkins Apr 4, 2022
36e9ef7
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 4, 2022
ef4e29c
Enable github tmate
sjperkins Apr 4, 2022
08a6812
Target specific test case
sjperkins Apr 4, 2022
1ce45f0
bump
sjperkins Apr 4, 2022
222b24d
Explicitly specify sync/async stimulus_handler
sjperkins Apr 4, 2022
55e5216
Assert with is_coroutine_function
sjperkins Apr 4, 2022
22e3300
Document sync parameter
sjperkins Apr 4, 2022
9c42c3f
Revert "bump"
sjperkins Apr 4, 2022
a517fcb
Revert "Target specific test case"
sjperkins Apr 4, 2022
e29ad73
Revert "Enable github tmate"
sjperkins Apr 4, 2022
8458969
comments
sjperkins Apr 4, 2022
caf9a1d
Template stimulus_id var in dashboard
sjperkins Apr 4, 2022
a57d9c1
Pass stimulus_id to Client._decref
sjperkins Apr 4, 2022
b311bef
stimulus_handler changes
sjperkins Apr 4, 2022
1cf4032
worker changes
sjperkins Apr 4, 2022
9b789f6
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 5, 2022
4433754
Use a contextmanager instead of a decorator
sjperkins Apr 5, 2022
046b49a
Remove default stimulus_id's throughout the scheduler
sjperkins Apr 5, 2022
9241c72
RuntimeError -> AssertionError
sjperkins Apr 5, 2022
c37f743
Cast appropriately to access the _validate member in a cython build
sjperkins Apr 5, 2022
c37ab61
Use a single ContextVar to represent STIMULUS_ID's
sjperkins Apr 5, 2022
8575d7a
Reintroduce default_stimulus_id in Scheduler.handler*
sjperkins Apr 6, 2022
3457a6e
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 6, 2022
d0aa434
Copy context in generic loop callbacks
sjperkins Apr 6, 2022
0527665
Merge branch 'main' into stimulus-ids-contextvars
sjperkins Apr 7, 2022
2cff5cc
Support ContextVar stimulus_id's received via explicitly defined hand…
sjperkins Apr 7, 2022
1ddffb0
Address review comments
sjperkins Apr 7, 2022
51c2c63
Remove debugging assert
sjperkins Apr 7, 2022
78575c5
Remove final handle_remove_worker
sjperkins Apr 7, 2022
2c0c9a1
Call handlers in diagnostic test cases
sjperkins Apr 7, 2022
0639cbf
Removed unused test fixture
sjperkins Apr 7, 2022
bfcbdb0
Merge branch 'main' into stimulus-id-contextvars-explicit-handlers
sjperkins Apr 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 62 additions & 13 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def __init__(self, key, client=None, inform=True, state=None):
"op": "client-desires-keys",
"keys": [stringify(key)],
"client": self.client.id,
"stimulus_id": f"client-desires-keys-{time()}",
}
)

Expand Down Expand Up @@ -472,6 +473,7 @@ def __setstate__(self, state):
"tasks": {},
"keys": [stringify(self.key)],
"client": c.id,
"stimulus_id": f"stimulus-id-{time()}",
}
)

Expand Down Expand Up @@ -1265,6 +1267,7 @@ async def _ensure_connected(self, timeout=None):
"client": self.id,
"reply": False,
"versions": version_module.get_versions(),
"stimulus_id": f"client-ensure-connected-{time()}",
}
)
except Exception:
Expand Down Expand Up @@ -1371,17 +1374,22 @@ def _dec_ref(self, key):
self.refcount[key] -= 1
if self.refcount[key] == 0:
del self.refcount[key]
self._release_key(key)
self._release_key(key, f"client-release-key-{time()}")

def _release_key(self, key):
def _release_key(self, key, stimulus_id: str):
"""Release key from distributed memory"""
logger.debug("Release key %s", key)
st = self.futures.pop(key, None)
if st is not None:
st.cancel()
if self.status != "closed":
self._send_to_scheduler(
{"op": "client-releases-keys", "keys": [key], "client": self.id}
{
"op": "client-releases-keys",
"keys": [key],
"client": self.id,
"stimulus_id": stimulus_id,
}
)

async def _handle_report(self):
Expand Down Expand Up @@ -1506,7 +1514,9 @@ async def _close(self, fast=False):
and self.scheduler_comm.comm
and not self.scheduler_comm.comm.closed()
):
self._send_to_scheduler({"op": "close-client"})
self._send_to_scheduler(
{"op": "close-client", "stimulus_id": f"client-close-{time()}"}
)
self._send_to_scheduler({"op": "close-stream"})

current_task = asyncio.current_task()
Expand All @@ -1527,8 +1537,10 @@ async def _close(self, fast=False):
):
await self.scheduler_comm.close()

stimulus_id = f"client-close-{time()}"

for key in list(self.futures):
self._release_key(key=key)
self._release_key(key=key, stimulus_id=stimulus_id)

if self._start_arg is None:
with suppress(AttributeError):
Expand Down Expand Up @@ -2110,12 +2122,20 @@ async def _gather_remote(self, direct, local_worker):
response = {"status": "OK", "data": data2}
if missing_keys:
keys2 = [key for key in keys if key not in data2]
response = await retry_operation(self.scheduler.gather, keys=keys2)
response = await retry_operation(
self.scheduler.gather,
keys=keys2,
stimulus_id=f"client-gather-remote-{time()}",
)
if response["status"] == "OK":
response["data"].update(data2)

else: # ask scheduler to gather data for us
response = await retry_operation(self.scheduler.gather, keys=keys)
response = await retry_operation(
self.scheduler.gather,
keys=keys,
stimulus_id=f"client-gather-remote-{time()}",
)

return response

Expand Down Expand Up @@ -2201,6 +2221,8 @@ async def _scatter(
d = await self._scatter(keymap(stringify, data), workers, broadcast)
return {k: d[stringify(k)] for k in data}

stimulus_id = f"client-scatter-{time()}"

if isinstance(data, type(range(0))):
data = list(data)
input_type = type(data)
Expand Down Expand Up @@ -2242,6 +2264,7 @@ async def _scatter(
who_has={key: [local_worker.address] for key in data},
nbytes=valmap(sizeof, data),
client=self.id,
stimulus_id=stimulus_id,
)

else:
Expand All @@ -2264,7 +2287,10 @@ async def _scatter(
)

await self.scheduler.update_data(
who_has=who_has, nbytes=nbytes, client=self.id
who_has=who_has,
nbytes=nbytes,
client=self.id,
stimulus_id=stimulus_id,
)
else:
await self.scheduler.scatter(
Expand All @@ -2273,6 +2299,7 @@ async def _scatter(
client=self.id,
broadcast=broadcast,
timeout=timeout,
stimulus_id=stimulus_id,
)

out = {k: Future(k, self, inform=False) for k in data}
Expand Down Expand Up @@ -2396,7 +2423,12 @@ def scatter(

async def _cancel(self, futures, force=False):
keys = list({stringify(f.key) for f in futures_of(futures)})
await self.scheduler.cancel(keys=keys, client=self.id, force=force)
await self.scheduler.cancel(
keys=keys,
client=self.id,
force=force,
stimulus_id=f"client-cancel-{time()}",
)
for k in keys:
st = self.futures.pop(k, None)
if st is not None:
Expand All @@ -2423,7 +2455,9 @@ def cancel(self, futures, asynchronous=None, force=False):

async def _retry(self, futures):
keys = list({stringify(f.key) for f in futures_of(futures)})
response = await self.scheduler.retry(keys=keys, client=self.id)
response = await self.scheduler.retry(
keys=keys, client=self.id, stimulus_id=f"client-retry-{time()}"
)
for key in response:
st = self.futures[key]
st.retry()
Expand Down Expand Up @@ -2922,6 +2956,7 @@ def _graph_to_futures(
"fifo_timeout": fifo_timeout,
"actors": actors,
"code": self._get_computation_code(),
"stimulus_id": f"client-update-graph-hlg-{time()}",
}
)
return futures
Expand Down Expand Up @@ -3347,7 +3382,13 @@ async def _restart(self, timeout=no_default):
if timeout is not None:
timeout = parse_timedelta(timeout, "s")

self._send_to_scheduler({"op": "restart", "timeout": timeout})
self._send_to_scheduler(
{
"op": "restart",
"timeout": timeout,
"stimulus_id": f"client-restart-{time()}",
}
)
self._restart_event = asyncio.Event()
try:
await asyncio.wait_for(self._restart_event.wait(), timeout)
Expand Down Expand Up @@ -3424,7 +3465,9 @@ async def _rebalance(self, futures=None, workers=None):
keys = list({stringify(f.key) for f in self.futures_of(futures)})
else:
keys = None
result = await self.scheduler.rebalance(keys=keys, workers=workers)
result = await self.scheduler.rebalance(
keys=keys, workers=workers, stimulus_id=f"client-rebalance-{time()}"
)
if result["status"] == "partial-fail":
raise KeyError(f"Could not rebalance keys: {result['keys']}")
assert result["status"] == "OK", result
Expand Down Expand Up @@ -3459,7 +3502,11 @@ async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
await _wait(futures)
keys = {stringify(f.key) for f in futures}
await self.scheduler.replicate(
keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
keys=list(keys),
n=n,
workers=workers,
branching_factor=branching_factor,
stimulus_id=f"client-replicate-{time()}",
)

def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs):
Expand Down Expand Up @@ -4177,6 +4224,7 @@ def retire_workers(
self.scheduler.retire_workers,
workers=workers,
close_workers=close_workers,
stimulus_id=f"client-retire-workers-{time()}",
**kwargs,
)

Expand Down Expand Up @@ -5138,6 +5186,7 @@ def fire_and_forget(obj):
"op": "client-desires-keys",
"keys": [stringify(future.key)],
"client": "fire-and-forget",
"stimulus_id": f"client-fire-and-forget-{time()}",
}
)

Expand Down
7 changes: 5 additions & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections import defaultdict
from collections.abc import Container
from contextlib import suppress
from contextvars import copy_context
from enum import Enum
from functools import partial
from typing import Callable, ClassVar, TypedDict, TypeVar
Expand Down Expand Up @@ -619,7 +620,9 @@ async def handle_stream(self, comm, extra=None, every_cycle=()):
break
handler = self.stream_handlers[op]
if is_coroutine_function(handler):
self.loop.add_callback(handler, **merge(extra, msg))
self.loop.add_callback(
copy_context().run, handler, **merge(extra, msg)
)
await gen.sleep(0)
else:
handler(**merge(extra, msg))
Expand All @@ -629,7 +632,7 @@ async def handle_stream(self, comm, extra=None, every_cycle=()):

for func in every_cycle:
if is_coroutine_function(func):
self.loop.add_callback(func)
self.loop.add_callback(copy_context().run, func)
else:
func()

Expand Down
2 changes: 2 additions & 0 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dask.utils import parse_timedelta

from distributed.deploy.adaptive_core import AdaptiveCore
from distributed.metrics import time
from distributed.protocol import pickle
from distributed.utils import log_errors

Expand Down Expand Up @@ -193,6 +194,7 @@ async def scale_down(self, workers):
names=workers,
remove=True,
close_workers=True,
stimulus_id=f"scale-down-{time()}",
)

# close workers more forcefully
Expand Down
6 changes: 5 additions & 1 deletion distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from distributed.core import CommClosedError, Status, rpc
from distributed.deploy.adaptive import Adaptive
from distributed.deploy.cluster import Cluster
from distributed.metrics import time
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.utils import NoOpAwaitable, TimeoutError, import_term, silence_logging
Expand Down Expand Up @@ -318,7 +319,10 @@ async def _correct_state_internal(self):
to_close = set(self.workers) - set(self.worker_spec)
if to_close:
if self.scheduler.status == Status.running:
await self.scheduler_comm.retire_workers(workers=list(to_close))
await self.scheduler_comm.retire_workers(
workers=list(to_close),
stimulus_id=f"spec-cluster-correct-internal-state-{time()}",
)
tasks = [
asyncio.create_task(self.workers[w].close())
for w in to_close
Expand Down
5 changes: 4 additions & 1 deletion distributed/http/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ <h3 class="title is-5"> Transition Log </h3>
<th> Key </th>
<th> Start </th>
<th> Finish </th>
<th> Stimulus ID </th>
<th> Recommended Key </th>
<th> Recommended Action </th>
</thead>

{% for key, start, finish, recommendations, transition_time in scheduler.story(Task) %}
{% for key, start, finish, recommendations, stimulus_id, transition_time in scheduler.story(Task) %}
<tr>
<td> {{ fromtimestamp(transition_time) }} </td>
<td> <a href="{{ url_escape(key) }}.html">{{key}}</a> </td>
<td> {{ start }} </td>
<td> {{ finish }} </td>
<td> {{ stimulus_id }} </td>
<td> </td>
<td> </td>
</tr>
Expand All @@ -137,6 +139,7 @@ <h3 class="title is-5"> Transition Log </h3>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> <a href="{{ url_escape(key2) }}.html">{{key2}}</a> </td>
<td> {{ rec }} </td>
</tr>
Expand Down
5 changes: 4 additions & 1 deletion distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ async def _unregister(self, timeout=10):
allowed_errors = (TimeoutError, CommClosedError, EnvironmentError, RPCClosed)
with suppress(allowed_errors):
await asyncio.wait_for(
self.scheduler.unregister(address=self.worker_address), timeout
self.scheduler.unregister(
address=self.worker_address, stimulus_id=f"close-nanny-{time()}"
),
timeout,
)

@property
Expand Down
Loading