From f6c87b57b28ba4fb7a01a9fdc45e7bfaeaae2ff1 Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Mon, 15 Jul 2024 20:56:53 -0500 Subject: [PATCH] Add versioning to AwaitedAction --- .../src/awaited_action_db/awaited_action.rs | 18 ++ .../src/memory_awaited_action_db.rs | 26 ++- .../src/simple_scheduler_state_manager.rs | 199 ++++++++++-------- 3 files changed, 155 insertions(+), 88 deletions(-) diff --git a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs index 6f96c797b..e05fd3475 100644 --- a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs +++ b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs @@ -23,9 +23,18 @@ use nativelink_util::action_messages::{ use nativelink_util::evicting_map::InstantWrapper; use static_assertions::{assert_eq_size, const_assert, const_assert_eq}; +/// The version of the awaited action. +/// This number will always increment by one each time +/// the action is updated. +#[derive(Debug, Clone, Copy)] +struct AwaitedActionVersion(u64); + /// An action that is being awaited on and last known state. #[derive(Debug, Clone)] pub struct AwaitedAction { + /// The current version of the action. + version: AwaitedActionVersion, + /// The action that is being awaited on. action_info: Arc, @@ -65,6 +74,7 @@ impl AwaitedAction { id: operation_id.clone(), }); Self { + version: AwaitedActionVersion(0), action_info, operation_id, sort_key, @@ -75,6 +85,14 @@ impl AwaitedAction { } } + pub fn version(&self) -> u64 { + self.version.0 + } + + pub fn increment_version(&mut self) { + self.version = AwaitedActionVersion(self.version.0 + 1); + } + pub fn action_info(&self) -> &Arc { &self.action_info } diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 971890798..b758d302c 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -505,7 +505,6 @@ impl AwaitedActionDbImpl { .map(|tx| MemoryAwaitedActionSubscriber::new(tx.subscribe())) } - // TODO!(rename) fn get_range_of_actions<'a, 'b>( &'a self, state: SortedAwaitedActionState, @@ -595,6 +594,26 @@ impl AwaitedActionDbImpl { // Note: It's important to drop old_awaited_action before we call // send_replace or we will have a deadlock. let old_awaited_action = tx.borrow(); + + // Do not process changes if the action version is not in sync with + // what the sender based the update on. + if old_awaited_action.version() + 1 != new_awaited_action.version() { + return Err(make_err!( + // From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html + // Use ABORTED if the client should retry at a higher level + // (e.g., when a client-specified test-and-set fails, + // indicating the client should restart a read-modify-write + // sequence) + Code::Aborted, + "{} Expected {:?} but got {:?} for operation_id {:?} - {:?}", + "Tried to update an awaited action with an incorrect version.", + old_awaited_action.version() + 1, + new_awaited_action.version(), + old_awaited_action, + new_awaited_action, + )); + } + error_if!( old_awaited_action.action_info().unique_qualifier != new_awaited_action.action_info().unique_qualifier, @@ -608,7 +627,6 @@ impl AwaitedActionDbImpl { .stage .is_same_stage(&new_awaited_action.state().stage); - // TODO!(Handle priority changes here). if !is_same_stage { self.sorted_action_info_hash_keys .process_state_changes(&old_awaited_action, &new_awaited_action)?; @@ -725,7 +743,9 @@ impl AwaitedActionDbImpl { &mut self, client_operation_id: &ClientOperationId, unique_qualifier: &ActionUniqueQualifier, - // TODO!() + // TODO(allada) To simplify the scheduler 2024 refactor, we + // removed the ability to upgrade priorities of actions. + // we should add priority upgrades back in. _priority: i32, ) -> Result, Error> { let unique_key = match unique_qualifier { diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 119e63939..5c637406e 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -34,6 +34,10 @@ use super::awaited_action_db::{ }; use crate::memory_awaited_action_db::{ClientActionStateResult, MatchingEngineActionStateResult}; +/// Maximum number of times an update to the database +/// can fail before giving up. +const MAX_UPDATE_RETRIES: usize = 5; + /// Simple struct that implements the ActionStateResult trait and always returns an error. struct ErrorActionStateResult(Error); @@ -149,101 +153,126 @@ impl SimpleSchedulerStateManager { maybe_worker_id: Option<&WorkerId>, action_stage_result: Result, ) -> Result<(), Error> { - let maybe_awaited_action_subscriber = self - .action_db - .get_by_operation_id(operation_id) - .await - .err_tip(|| "In MemorySchedulerStateManager::update_operation")?; - let awaited_action_subscriber = match maybe_awaited_action_subscriber { - Some(sub) => sub, - // No action found. It is ok if the action was not found. It probably - // means that the action was dropped, but worker was still processing - // it. - None => return Ok(()), - }; + let mut last_err = None; + for _ in 0..MAX_UPDATE_RETRIES { + let maybe_awaited_action_subscriber = self + .action_db + .get_by_operation_id(operation_id) + .await + .err_tip(|| "In MemorySchedulerStateManager::update_operation")?; + let awaited_action_subscriber = match maybe_awaited_action_subscriber { + Some(sub) => sub, + // No action found. It is ok if the action was not found. It probably + // means that the action was dropped, but worker was still processing + // it. + None => return Ok(()), + }; - let mut awaited_action = awaited_action_subscriber.borrow(); + let mut awaited_action = awaited_action_subscriber.borrow(); - // Make sure we don't update an action that is already completed. - if awaited_action.state().stage.is_finished() { - return Err(make_err!( - Code::Internal, - "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", - awaited_action.state().stage, - maybe_worker_id, - )); - } + // Make sure we don't update an action that is already completed. + if awaited_action.state().stage.is_finished() { + return Err(make_err!( + Code::Internal, + "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", + awaited_action.state().stage, + maybe_worker_id, + )); + } - // Make sure the worker id matches the awaited action worker id. - // This might happen if the worker sending the update is not the - // worker that was assigned. - if awaited_action.worker_id().is_some() - && maybe_worker_id.is_some() - && maybe_worker_id != awaited_action.worker_id().as_ref() - { - let err = make_err!( - Code::Internal, - "Worker ids do not match - {:?} != {:?} for {:?}", - maybe_worker_id, - awaited_action.worker_id(), - awaited_action, - ); - event!( - Level::ERROR, - ?operation_id, - ?maybe_worker_id, - ?awaited_action, - "{}", - err.to_string(), - ); - return Err(err); - } + // Make sure the worker id matches the awaited action worker id. + // This might happen if the worker sending the update is not the + // worker that was assigned. + if awaited_action.worker_id().is_some() + && maybe_worker_id.is_some() + && maybe_worker_id != awaited_action.worker_id().as_ref() + { + let err = make_err!( + Code::Internal, + "Worker ids do not match - {:?} != {:?} for {:?}", + maybe_worker_id, + awaited_action.worker_id(), + awaited_action, + ); + event!( + Level::ERROR, + ?operation_id, + ?maybe_worker_id, + ?awaited_action, + "{}", + err.to_string(), + ); + return Err(err); + } - let stage = match action_stage_result { - Ok(stage) => stage, - Err(err) => { - // Don't count a backpressure failure as an attempt for an action. - let due_to_backpressure = err.code == Code::ResourceExhausted; - if !due_to_backpressure { - awaited_action.attempts += 1; + let stage = match &action_stage_result { + Ok(stage) => stage.clone(), + Err(err) => { + // Don't count a backpressure failure as an attempt for an action. + let due_to_backpressure = err.code == Code::ResourceExhausted; + if !due_to_backpressure { + awaited_action.attempts += 1; + } + + if awaited_action.attempts > self.max_job_retries { + ActionStage::Completed(ActionResult { + execution_metadata: ExecutionMetadata { + worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()), + ..ExecutionMetadata::default() + }, + error: Some(err.clone().merge(make_err!( + Code::Internal, + "Job cancelled because it attempted to execute too many times and failed {}", + format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"), + ))), + ..ActionResult::default() + }) + } else { + ActionStage::Queued + } } + }; + if matches!(stage, ActionStage::Queued) { + // If the action is queued, we need to unset the worker id regardless of + // which worker sent the update. + awaited_action.set_worker_id(None); + } else { + awaited_action.set_worker_id(maybe_worker_id.copied()); + } + awaited_action.set_state(Arc::new(ActionState { + stage, + id: operation_id.clone(), + })); + awaited_action.increment_version(); - if awaited_action.attempts > self.max_job_retries { - ActionStage::Completed(ActionResult { - execution_metadata: ExecutionMetadata { - worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()), - ..ExecutionMetadata::default() - }, - error: Some(err.clone().merge(make_err!( - Code::Internal, - "Job cancelled because it attempted to execute too many times and failed {}", - format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"), - ))), - ..ActionResult::default() - }) + let update_action_result = self + .action_db + .update_awaited_action(awaited_action) + .await + .err_tip(|| "In MemorySchedulerStateManager::update_operation"); + if let Err(err) = update_action_result { + // We use Aborted to signal that the action was not + // updated due to the data being set was not the latest + // but can be retried. + if err.code == Code::Aborted { + last_err = Some(err); + continue; } else { - ActionStage::Queued + return Err(err); } } - }; - if matches!(stage, ActionStage::Queued) { - // If the action is queued, we need to unset the worker id regardless of - // which worker sent the update. - awaited_action.set_worker_id(None); - } else { - awaited_action.set_worker_id(maybe_worker_id.copied()); - } - awaited_action.set_state(Arc::new(ActionState { - stage, - id: operation_id.clone(), - })); - self.action_db - .update_awaited_action(awaited_action) - .await - .err_tip(|| "In MemorySchedulerStateManager::update_operation")?; - self.tasks_change_notify.notify_one(); - Ok(()) + self.tasks_change_notify.notify_one(); + return Ok(()); + } + match last_err { + Some(err) => Err(err), + None => Err(make_err!( + Code::Internal, + "Failed to update action after {} retries with no error set", + MAX_UPDATE_RETRIES, + )), + } } async fn inner_add_operation(