From 34d93b7d1cda7c0f4a430364b7588002ab612494 Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Fri, 28 Jun 2024 19:56:37 -0500 Subject: [PATCH] [Refactor] Move worker notification in SimpleScheduler under Workers Moves the logic on when the matching enginge trigger gets run to under the workers struct where easy. This splits the logic of when a task is changed and matching engine needs to run and when a task gets run and the matching engine needs to be run. towards: #359 --- .../src/scheduler_state/workers.rs | 16 +++++- nativelink-scheduler/src/simple_scheduler.rs | 51 ++++++++++--------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/nativelink-scheduler/src/scheduler_state/workers.rs b/nativelink-scheduler/src/scheduler_state/workers.rs index 25e78e2bb..b32f24c09 100644 --- a/nativelink-scheduler/src/scheduler_state/workers.rs +++ b/nativelink-scheduler/src/scheduler_state/workers.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use lru::LruCache; use nativelink_config::schedulers::WorkerAllocationStrategy; use nativelink_error::{error_if, make_input_err, Error, ResultExt}; use nativelink_util::action_messages::WorkerId; use nativelink_util::platform_properties::PlatformProperties; +use tokio::sync::Notify; use tracing::{event, Level}; use crate::worker::{Worker, WorkerTimestamp}; @@ -27,13 +30,19 @@ pub struct Workers { pub(crate) workers: LruCache, /// The allocation strategy for workers. pub(crate) allocation_strategy: WorkerAllocationStrategy, + /// A channel to notify the matching engine that the worker pool has changed. + pub(crate) worker_change_notify: Arc, } impl Workers { - pub(crate) fn new(allocation_strategy: WorkerAllocationStrategy) -> Self { + pub(crate) fn new( + allocation_strategy: WorkerAllocationStrategy, + worker_change_notify: Arc, + ) -> Self { Self { workers: LruCache::unbounded(), allocation_strategy, + worker_change_notify, } } @@ -80,6 +89,7 @@ impl Workers { "Worker connection appears to have been closed while adding to pool" ); } + self.worker_change_notify.notify_one(); res } @@ -87,7 +97,9 @@ impl Workers { /// Note: The caller is responsible for any rescheduling of any tasks that might be /// running. pub(crate) fn remove_worker(&mut self, worker_id: &WorkerId) -> Option { - self.workers.pop(worker_id) + let result = self.workers.pop(worker_id); + self.worker_change_notify.notify_one(); + result } // Attempts to find a worker that is capable of running this action. diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index dd4f261e1..91da284ac 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -244,7 +244,8 @@ impl SimpleSchedulerImpl { } } // Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once. - inner_state.tasks_change_notify.notify_one(); + // TODO(allada) This should be moved to inside the Workers struct. + workers.worker_change_notify.notify_one(); } /// Sets if the worker is draining or not. @@ -260,12 +261,8 @@ impl SimpleSchedulerImpl { .err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"))?; self.metrics.workers_drained.inc(); worker.is_draining = is_draining; - self.state_manager - .inner - .lock() - .await - .tasks_change_notify - .notify_one(); + // TODO(allada) This should move to inside the Workers struct. + self.workers.worker_change_notify.notify_one(); Ok(()) } @@ -550,6 +547,9 @@ impl SimpleSchedulerImpl { } } + // TODO(allada) This should move to inside the Workers struct. + self.workers.worker_change_notify.notify_one(); + Ok(()) } } @@ -610,14 +610,14 @@ impl SimpleScheduler { max_job_retries = DEFAULT_MAX_JOB_RETRIES; } - let tasks_change_notify = Arc::new(Notify::new()); + let tasks_or_worker_change_notify = Arc::new(Notify::new()); let state_manager = Arc::new(StateManager::new( HashSet::new(), BTreeMap::new(), HashMap::new(), HashSet::new(), Arc::new(SchedulerMetrics::default()), - tasks_change_notify.clone(), + tasks_or_worker_change_notify.clone(), )); let metrics = Arc::new(Metrics::default()); let metrics_for_do_try_match = metrics.clone(); @@ -626,8 +626,11 @@ impl SimpleScheduler { retain_completed_for: Duration::new(retain_completed_for_s, 0), worker_timeout_s, max_job_retries, + workers: Workers::new( + scheduler_cfg.allocation_strategy, + tasks_or_worker_change_notify.clone(), + ), metrics: metrics.clone(), - workers: Workers::new(scheduler_cfg.allocation_strategy), })); let weak_inner = Arc::downgrade(&inner); Self { @@ -638,7 +641,7 @@ impl SimpleScheduler { async move { // Break out of the loop only when the inner is dropped. loop { - tasks_change_notify.notified().await; + tasks_or_worker_change_notify.notified().await; match weak_inner.upgrade() { // Note: According to `parking_lot` documentation, the default // `Mutex` implementation is eventual fairness, so we don't @@ -774,22 +777,20 @@ impl WorkerScheduler for SimpleScheduler { let state_manager = inner_scheduler.state_manager.clone(); let mut inner_state = state_manager.inner.lock().await; self.metrics.add_worker.wrap(move || { - let res = inner_scheduler + inner_scheduler .workers .add_worker(worker) - .err_tip(|| "Error while adding worker, removing from pool"); - if let Err(err) = &res { - SimpleSchedulerImpl::immediate_evict_worker( - &mut inner_state, - &mut inner_scheduler.workers, - max_job_retries, - &self.metrics, - &worker_id, - err.clone(), - ); - } - inner_state.tasks_change_notify.notify_one(); - res + .err_tip(|| "Error while adding worker, removing from pool") + .inspect_err(|err| { + SimpleSchedulerImpl::immediate_evict_worker( + &mut inner_state, + &mut inner_scheduler.workers, + max_job_retries, + &self.metrics, + &worker_id, + err.clone(), + ); + }) }) }