-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ctxvars stimulus ID #6068
Ctxvars stimulus ID #6068
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about this PR. Here's an example with the stimulus_id on the right after the # comment.
- Client._send_to_scheduler({"op": "update-graph-hlg", "stimulus_id": "CHLG"}) # CHLG
- Scheduler.update_graph_hlg # CHLG
- Scheduler..update_graph # CHLG
- Scheduler.worker_send({"op": "compute-task"}) # CHLG
- Worker.handle_compute_task # CHLG
- Worker.execute # CHLG
- Worker._handle_instruction # "task-finished"
- Worker.batched_stream({"op": "task-finished"}) # "task-finished"
- Scheduler.handle_task_finished # "task-finished"
I've been thinking about this in a Cluster vs a LocalCluster approach.
- Within a Distributed Cluster, the STIMULUS_ID ContextVar will be unset and must derived from a message from an external entity (Client, Scheduler, Worker).
- Within a LocalCluster (Client, Scheduler, Worker in the same process), the STIMULUS_ID may already have been set.
I am considering how to write code to handle both cases and this is complicated by examples such as update_graph_hlg calling update_graph.
I also wonder if a Distributed ContextVar might be possible with the simple use of STIMULUS_ID.set(stimulus_id)
at various handler boundaries and copy_context().run
at thread boundaries.
I think there's a lot of complexity here (async, threads and inter-process communication) that I'm trying to get my head around. To improve my understanding I would like to model this with some minimal distributed.core.Server's in a test case.
A further thought on managing this complexity would be to add some kwargs to default_stimulus_id
such as override
(always override existing STIMULUS_ID) and require_empty
(require STIMULUS_ID to not be present prior to setting it)
This builds on top of #6046 (or at least a version of it) and moves the set_stimulus_id (former Scheduler.stimulus_id) to the utils.py module and applies the contextvar to the worker as well.
I encountered some problems with the dataclasses we're using. I'm sure this can be ironed out but I didn't want to waste time.