Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ctxvars stimulus ID #6068

Closed
wants to merge 31 commits into from
Closed

Ctxvars stimulus ID #6068

wants to merge 31 commits into from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Apr 5, 2022

This builds on top of #6046 (or at least a version of it) and moves the set_stimulus_id (former Scheduler.stimulus_id) to the utils.py module and applies the contextvar to the worker as well.

I encountered some problems with the dataclasses we're using. I'm sure this can be ironed out but I didn't want to waste time.

sjperkins and others added 30 commits April 1, 2022 12:19
inspect.iscoroutinefunction doesn't recognise cythonised async functions
This reverts commit 1ce45f0.
@github-actions
Copy link
Contributor

github-actions bot commented Apr 5, 2022

Unit Test Results

       12 files   -        6         12 suites   - 6   6h 13m 29s ⏱️ - 3h 9m 57s
  2 724 tests ±       0    2 616 ✔️  -      25    99 💤 +  18    9 +  7 
16 260 runs   - 8 104  15 339 ✔️  - 7 804  877 💤  - 342  44 +42 

For more details on these failures, see this check.

Results for commit c780413. ± Comparison against base commit 64615ed.

@sjperkins sjperkins self-requested a review April 6, 2022 09:54
Copy link
Member

@sjperkins sjperkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this PR. Here's an example with the stimulus_id on the right after the # comment.

  1. Client._send_to_scheduler({"op": "update-graph-hlg", "stimulus_id": "CHLG"}) # CHLG
  2. Scheduler.update_graph_hlg # CHLG
  3. Scheduler..update_graph # CHLG
  4. Scheduler.worker_send({"op": "compute-task"}) # CHLG
  5. Worker.handle_compute_task # CHLG
  6. Worker.execute # CHLG
  7. Worker._handle_instruction # "task-finished"
  8. Worker.batched_stream({"op": "task-finished"}) # "task-finished"
  9. Scheduler.handle_task_finished # "task-finished"

I've been thinking about this in a Cluster vs a LocalCluster approach.

  1. Within a Distributed Cluster, the STIMULUS_ID ContextVar will be unset and must derived from a message from an external entity (Client, Scheduler, Worker).
  2. Within a LocalCluster (Client, Scheduler, Worker in the same process), the STIMULUS_ID may already have been set.

I am considering how to write code to handle both cases and this is complicated by examples such as update_graph_hlg calling update_graph.

I also wonder if a Distributed ContextVar might be possible with the simple use of STIMULUS_ID.set(stimulus_id) at various handler boundaries and copy_context().run at thread boundaries.

I think there's a lot of complexity here (async, threads and inter-process communication) that I'm trying to get my head around. To improve my understanding I would like to model this with some minimal distributed.core.Server's in a test case.

A further thought on managing this complexity would be to add some kwargs to default_stimulus_id such as override (always override existing STIMULUS_ID) and require_empty (require STIMULUS_ID to not be present prior to setting it)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants