From d5a013afa7802d5547a852cb2f25f7e572426f8e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 11 Apr 2022 10:52:41 +0200 Subject: [PATCH] Remove stimulus_id from long running messages --- distributed/scheduler.py | 7 +------ distributed/worker.py | 8 +------- distributed/worker_state_machine.py | 3 +-- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2514fbefee..630e41dec0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5713,18 +5713,13 @@ def release_worker_data(self, key, worker, stimulus_id): if recommendations: self.transitions(recommendations, stimulus_id) - def handle_long_running( - self, key=None, worker=None, compute_duration=None, stimulus_id=None - ): + def handle_long_running(self, key=None, worker=None, compute_duration=None): """A task has seceded from the thread pool We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped. """ parent: SchedulerState = cast(SchedulerState, self) - # TODO(sjperkins): This stimulus isn't passed anywhere, so - # could probably be removed from the signature before the PR is merged - assert stimulus_id if key not in parent._tasks: logger.debug("Skipping long_running since key %s was already released", key) return diff --git a/distributed/worker.py b/distributed/worker.py index e0a16f960e..4e405101d6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2451,13 +2451,7 @@ def transition_executing_long_running( return merge_recs_instructions( ( {}, - [ - LongRunningMsg( - key=ts.key, - compute_duration=compute_duration, - stimulus_id=stimulus_id, - ) - ], + [LongRunningMsg(key=ts.key, compute_duration=compute_duration)], ), self._ensure_computing(), ) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 347f003325..4b7ac5302f 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -342,10 +342,9 @@ class RescheduleMsg(SendMessageToScheduler): class LongRunningMsg(SendMessageToScheduler): op = "long-running" - __slots__ = ("key", "compute_duration", "stimulus_id") + __slots__ = ("key", "compute_duration") key: str compute_duration: float - stimulus_id: str @dataclass