Skip to content

Commit

Permalink
[Refactor] Move worker notification in SimpleScheduler under Workers
Browse files Browse the repository at this point in the history
Moves the logic on when the matching enginge trigger gets run to
under the workers struct where easy. This splits the logic of
when a task is changed and matching engine needs to run and when
a task gets run and the matching engine needs to be run.

towards: #359
  • Loading branch information
allada committed Jun 29, 2024
1 parent b9d9702 commit 34d93b7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
16 changes: 14 additions & 2 deletions nativelink-scheduler/src/scheduler_state/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
use nativelink_util::action_messages::WorkerId;
use nativelink_util::platform_properties::PlatformProperties;
use tokio::sync::Notify;
use tracing::{event, Level};

use crate::worker::{Worker, WorkerTimestamp};
Expand All @@ -27,13 +30,19 @@ pub struct Workers {
pub(crate) workers: LruCache<WorkerId, Worker>,
/// The allocation strategy for workers.
pub(crate) allocation_strategy: WorkerAllocationStrategy,
/// A channel to notify the matching engine that the worker pool has changed.
pub(crate) worker_change_notify: Arc<Notify>,
}

impl Workers {
pub(crate) fn new(allocation_strategy: WorkerAllocationStrategy) -> Self {
pub(crate) fn new(
allocation_strategy: WorkerAllocationStrategy,
worker_change_notify: Arc<Notify>,
) -> Self {
Self {
workers: LruCache::unbounded(),
allocation_strategy,
worker_change_notify,
}
}

Expand Down Expand Up @@ -80,14 +89,17 @@ impl Workers {
"Worker connection appears to have been closed while adding to pool"
);
}
self.worker_change_notify.notify_one();
res
}

/// Removes worker from pool.
/// Note: The caller is responsible for any rescheduling of any tasks that might be
/// running.
pub(crate) fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
self.workers.pop(worker_id)
let result = self.workers.pop(worker_id);
self.worker_change_notify.notify_one();
result
}

// Attempts to find a worker that is capable of running this action.
Expand Down
51 changes: 26 additions & 25 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ impl SimpleSchedulerImpl {
}
}
// Note: Calling this many time is very cheap, it'll only trigger `do_try_match` once.
inner_state.tasks_change_notify.notify_one();
// TODO(allada) This should be moved to inside the Workers struct.
workers.worker_change_notify.notify_one();
}

/// Sets if the worker is draining or not.
Expand All @@ -260,12 +261,8 @@ impl SimpleSchedulerImpl {
.err_tip(|| format!("Worker {worker_id} doesn't exist in the pool"))?;
self.metrics.workers_drained.inc();
worker.is_draining = is_draining;
self.state_manager
.inner
.lock()
.await
.tasks_change_notify
.notify_one();
// TODO(allada) This should move to inside the Workers struct.
self.workers.worker_change_notify.notify_one();
Ok(())
}

Expand Down Expand Up @@ -550,6 +547,9 @@ impl SimpleSchedulerImpl {
}
}

// TODO(allada) This should move to inside the Workers struct.
self.workers.worker_change_notify.notify_one();

Ok(())
}
}
Expand Down Expand Up @@ -610,14 +610,14 @@ impl SimpleScheduler {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

let tasks_change_notify = Arc::new(Notify::new());
let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = Arc::new(StateManager::new(
HashSet::new(),
BTreeMap::new(),
HashMap::new(),
HashSet::new(),
Arc::new(SchedulerMetrics::default()),
tasks_change_notify.clone(),
tasks_or_worker_change_notify.clone(),
));
let metrics = Arc::new(Metrics::default());
let metrics_for_do_try_match = metrics.clone();
Expand All @@ -626,8 +626,11 @@ impl SimpleScheduler {
retain_completed_for: Duration::new(retain_completed_for_s, 0),
worker_timeout_s,
max_job_retries,
workers: Workers::new(
scheduler_cfg.allocation_strategy,
tasks_or_worker_change_notify.clone(),
),
metrics: metrics.clone(),
workers: Workers::new(scheduler_cfg.allocation_strategy),
}));
let weak_inner = Arc::downgrade(&inner);
Self {
Expand All @@ -638,7 +641,7 @@ impl SimpleScheduler {
async move {
// Break out of the loop only when the inner is dropped.
loop {
tasks_change_notify.notified().await;
tasks_or_worker_change_notify.notified().await;
match weak_inner.upgrade() {
// Note: According to `parking_lot` documentation, the default
// `Mutex` implementation is eventual fairness, so we don't
Expand Down Expand Up @@ -774,22 +777,20 @@ impl WorkerScheduler for SimpleScheduler {
let state_manager = inner_scheduler.state_manager.clone();
let mut inner_state = state_manager.inner.lock().await;
self.metrics.add_worker.wrap(move || {
let res = inner_scheduler
inner_scheduler
.workers
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
if let Err(err) = &res {
SimpleSchedulerImpl::immediate_evict_worker(
&mut inner_state,
&mut inner_scheduler.workers,
max_job_retries,
&self.metrics,
&worker_id,
err.clone(),
);
}
inner_state.tasks_change_notify.notify_one();
res
.err_tip(|| "Error while adding worker, removing from pool")
.inspect_err(|err| {
SimpleSchedulerImpl::immediate_evict_worker(
&mut inner_state,
&mut inner_scheduler.workers,
max_job_retries,
&self.metrics,
&worker_id,
err.clone(),
);
})
})
}

Expand Down

0 comments on commit 34d93b7

Please sign in to comment.