Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stimulus ids scheduler #6161

Merged
merged 33 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6ee466f
test_scheduler.py and test_worker.py working
sjperkins Apr 8, 2022
5e01cd8
Transplant http changes
sjperkins Apr 8, 2022
d4dd229
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 8, 2022
9b110aa
test suite passing
sjperkins Apr 8, 2022
6a4e0ed
Rename assert_worker_story to assert_story
sjperkins Apr 8, 2022
1bf0a0f
Add scheduler stimulus test
sjperkins Apr 8, 2022
9b08f7a
Fix some failing --runslow tests
sjperkins Apr 8, 2022
d95183f
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 9, 2022
1e1d969
Remove debugging exception and test case remove_worker stimuli
sjperkins Apr 11, 2022
d5a013a
Remove stimulus_id from long running messages
sjperkins Apr 11, 2022
0a0f4bc
Remove stray comment from merge
sjperkins Apr 11, 2022
807add8
Add a test_stimulus_retry test case
sjperkins Apr 12, 2022
0913001
Test stimulus flows
sjperkins Apr 13, 2022
0d43985
Client Story rebased on #6095
sjperkins Apr 4, 2022
f952583
Introduce an ordered_timestamps keyword to assert_story
sjperkins Apr 4, 2022
bb870f0
Update test_client_story test case
sjperkins Apr 11, 2022
3d481a8
Incorporate stimulus_id checks into test_client_story
sjperkins Apr 12, 2022
aab88b4
Use standardised default keyword, rather than sentinel
sjperkins Apr 12, 2022
746b569
Merge pull request #1 from sjperkins/client-story-6095
sjperkins Apr 13, 2022
3a1b642
Fix imports
sjperkins Apr 13, 2022
e30d11b
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 13, 2022
f794d7e
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 13, 2022
3bb333d
Remove stimulus flow tests and simplify stimulus tests
sjperkins Apr 13, 2022
8a441b9
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 13, 2022
56b0371
Merge branch 'main' into stimulus-id-kwargs
sjperkins Apr 14, 2022
7f11ff9
Address review comments
sjperkins Apr 14, 2022
428dcb9
Revert to using CommClosedError
sjperkins Apr 14, 2022
1658418
Simplify tests and ensure Client.story carries source
fjetter Apr 20, 2022
31927f0
Correct a few stimulus_id links
fjetter Apr 20, 2022
34f4392
Merge remote-tracking branch 'origin/main' into stimulus_ids_scheduler
fjetter Apr 20, 2022
53acdf2
Allow assert_story to use real story output as expected
fjetter Apr 21, 2022
201035a
Use the same stimulus_id for worker status change
fjetter Apr 21, 2022
fe988af
Merge remote-tracking branch 'origin/main' into stimulus_ids_scheduler
fjetter Apr 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
25 changes: 25 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4276,6 +4276,31 @@ def collections_to_dsk(collections, *args, **kwargs):
"""Convert many collections into a single dask graph, after optimization"""
return collections_to_dsk(collections, *args, **kwargs)

async def _story(self, keys=(), on_error="raise"):
assert on_error in ("raise", "ignore")

try:
flat_stories = await self.scheduler.get_story(keys=keys)
flat_stories = [("scheduler", *msg) for msg in flat_stories]
except Exception:
if on_error == "raise":
raise
elif on_error == "ignore":
flat_stories = []
else:
raise ValueError(f"on_error not in {'raise', 'ignore'}")

responses = await self.scheduler.broadcast(
msg={"op": "get_story", "keys": keys}, on_error=on_error
)
for worker, stories in responses.items():
flat_stories.extend((worker, *msg) for msg in stories)
return flat_stories

def story(self, *keys_or_stimulus_ids, on_error="raise"):
"""Returns a cluster-wide story for the given keys or simtulus_id's"""
return self.sync(self._story, keys=keys_or_stimulus_ids, on_error=on_error)

def get_task_stream(
self,
start=None,
Expand Down
4 changes: 2 additions & 2 deletions distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import fsspec
import msgpack

from distributed._stories import scheduler_story as _scheduler_story
from distributed._stories import worker_story as _worker_story
from distributed.compatibility import to_thread
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)
Expand Down
5 changes: 4 additions & 1 deletion distributed/http/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ <h3 class="title is-5"> Transition Log </h3>
<th> Key </th>
<th> Start </th>
<th> Finish </th>
<th> Stimulus ID </th>
<th> Recommended Key </th>
<th> Recommended Action </th>
</thead>

{% for key, start, finish, recommendations, transition_time in scheduler.story(Task) %}
{% for key, start, finish, recommendations, stimulus_id, transition_time in scheduler.story(Task) %}
<tr>
<td> {{ fromtimestamp(transition_time) }} </td>
<td> <a href="{{ url_escape(key) }}.html">{{key}}</a> </td>
<td> {{ start }} </td>
<td> {{ finish }} </td>
<td> {{ stimulus_id }} </td>
<td> </td>
<td> </td>
</tr>
Expand All @@ -137,6 +139,7 @@ <h3 class="title is-5"> Transition Log </h3>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> <a href="{{ url_escape(key2) }}.html">{{key2}}</a> </td>
<td> {{ rec }} </td>
</tr>
Expand Down
5 changes: 4 additions & 1 deletion distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ async def _unregister(self, timeout=10):
allowed_errors = (TimeoutError, CommClosedError, EnvironmentError, RPCClosed)
with suppress(allowed_errors):
await asyncio.wait_for(
self.scheduler.unregister(address=self.worker_address), timeout
self.scheduler.unregister(
address=self.worker_address, stimulus_id=f"nanny-close-{time()}"
),
timeout,
)

@property
Expand Down
Loading