Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add awaited action versions #1163

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@ use nativelink_util::action_messages::{
use nativelink_util::evicting_map::InstantWrapper;
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};

/// The version of the awaited action.
/// This number will always increment by one each time
/// the action is updated.
#[derive(Debug, Clone, Copy)]
struct AwaitedActionVersion(u64);

/// An action that is being awaited on and last known state.
#[derive(Debug, Clone)]
pub struct AwaitedAction {
/// The current version of the action.
version: AwaitedActionVersion,

/// The action that is being awaited on.
action_info: Arc<ActionInfo>,

Expand Down Expand Up @@ -65,6 +74,7 @@ impl AwaitedAction {
id: operation_id.clone(),
});
Self {
version: AwaitedActionVersion(0),
action_info,
operation_id,
sort_key,
Expand All @@ -75,6 +85,14 @@ impl AwaitedAction {
}
}

pub fn version(&self) -> u64 {
self.version.0
}

pub fn increment_version(&mut self) {
self.version = AwaitedActionVersion(self.version.0 + 1);
}

pub fn action_info(&self) -> &Arc<ActionInfo> {
&self.action_info
}
Expand Down
26 changes: 23 additions & 3 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ impl AwaitedActionDbImpl {
.map(|tx| MemoryAwaitedActionSubscriber::new(tx.subscribe()))
}

// TODO!(rename)
fn get_range_of_actions<'a, 'b>(
&'a self,
state: SortedAwaitedActionState,
Expand Down Expand Up @@ -595,6 +594,26 @@ impl AwaitedActionDbImpl {
// Note: It's important to drop old_awaited_action before we call
// send_replace or we will have a deadlock.
let old_awaited_action = tx.borrow();

// Do not process changes if the action version is not in sync with
// what the sender based the update on.
if old_awaited_action.version() + 1 != new_awaited_action.version() {
return Err(make_err!(
// From: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
// Use ABORTED if the client should retry at a higher level
// (e.g., when a client-specified test-and-set fails,
// indicating the client should restart a read-modify-write
// sequence)
Code::Aborted,
"{} Expected {:?} but got {:?} for operation_id {:?} - {:?}",
"Tried to update an awaited action with an incorrect version.",
old_awaited_action.version() + 1,
new_awaited_action.version(),
old_awaited_action,
new_awaited_action,
));
}

error_if!(
old_awaited_action.action_info().unique_qualifier
!= new_awaited_action.action_info().unique_qualifier,
Expand All @@ -608,7 +627,6 @@ impl AwaitedActionDbImpl {
.stage
.is_same_stage(&new_awaited_action.state().stage);

// TODO!(Handle priority changes here).
if !is_same_stage {
self.sorted_action_info_hash_keys
.process_state_changes(&old_awaited_action, &new_awaited_action)?;
Expand Down Expand Up @@ -725,7 +743,9 @@ impl AwaitedActionDbImpl {
&mut self,
client_operation_id: &ClientOperationId,
unique_qualifier: &ActionUniqueQualifier,
// TODO!()
// TODO(allada) To simplify the scheduler 2024 refactor, we
// removed the ability to upgrade priorities of actions.
// we should add priority upgrades back in.
_priority: i32,
) -> Result<Option<MemoryAwaitedActionSubscriber>, Error> {
let unique_key = match unique_qualifier {
Expand Down
199 changes: 114 additions & 85 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use super::awaited_action_db::{
};
use crate::memory_awaited_action_db::{ClientActionStateResult, MatchingEngineActionStateResult};

/// Maximum number of times an update to the database
/// can fail before giving up.
const MAX_UPDATE_RETRIES: usize = 5;

/// Simple struct that implements the ActionStateResult trait and always returns an error.
struct ErrorActionStateResult(Error);

Expand Down Expand Up @@ -149,101 +153,126 @@ impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
maybe_worker_id: Option<&WorkerId>,
action_stage_result: Result<ActionStage, Error>,
) -> Result<(), Error> {
let maybe_awaited_action_subscriber = self
.action_db
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;
let awaited_action_subscriber = match maybe_awaited_action_subscriber {
Some(sub) => sub,
// No action found. It is ok if the action was not found. It probably
// means that the action was dropped, but worker was still processing
// it.
None => return Ok(()),
};
let mut last_err = None;
for _ in 0..MAX_UPDATE_RETRIES {
let maybe_awaited_action_subscriber = self
.action_db
.get_by_operation_id(operation_id)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;
let awaited_action_subscriber = match maybe_awaited_action_subscriber {
Some(sub) => sub,
// No action found. It is ok if the action was not found. It probably
// means that the action was dropped, but worker was still processing
// it.
None => return Ok(()),
};

let mut awaited_action = awaited_action_subscriber.borrow();
let mut awaited_action = awaited_action_subscriber.borrow();

// Make sure we don't update an action that is already completed.
if awaited_action.state().stage.is_finished() {
return Err(make_err!(
Code::Internal,
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
awaited_action.state().stage,
maybe_worker_id,
));
}
// Make sure we don't update an action that is already completed.
if awaited_action.state().stage.is_finished() {
return Err(make_err!(
Code::Internal,
"Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}",
awaited_action.state().stage,
maybe_worker_id,
));
}

// Make sure the worker id matches the awaited action worker id.
// This might happen if the worker sending the update is not the
// worker that was assigned.
if awaited_action.worker_id().is_some()
&& maybe_worker_id.is_some()
&& maybe_worker_id != awaited_action.worker_id().as_ref()
{
let err = make_err!(
Code::Internal,
"Worker ids do not match - {:?} != {:?} for {:?}",
maybe_worker_id,
awaited_action.worker_id(),
awaited_action,
);
event!(
Level::ERROR,
?operation_id,
?maybe_worker_id,
?awaited_action,
"{}",
err.to_string(),
);
return Err(err);
}
// Make sure the worker id matches the awaited action worker id.
// This might happen if the worker sending the update is not the
// worker that was assigned.
if awaited_action.worker_id().is_some()
&& maybe_worker_id.is_some()
&& maybe_worker_id != awaited_action.worker_id().as_ref()
{
let err = make_err!(
Code::Internal,
"Worker ids do not match - {:?} != {:?} for {:?}",
maybe_worker_id,
awaited_action.worker_id(),
awaited_action,
);
event!(
Level::ERROR,
?operation_id,
?maybe_worker_id,
?awaited_action,
"{}",
err.to_string(),
);
return Err(err);
}

let stage = match action_stage_result {
Ok(stage) => stage,
Err(err) => {
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
awaited_action.attempts += 1;
let stage = match &action_stage_result {
Ok(stage) => stage.clone(),
Err(err) => {
// Don't count a backpressure failure as an attempt for an action.
let due_to_backpressure = err.code == Code::ResourceExhausted;
if !due_to_backpressure {
awaited_action.attempts += 1;
}

if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()),
..ExecutionMetadata::default()
},
error: Some(err.clone().merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed {}",
format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
))),
..ActionResult::default()
})
} else {
ActionStage::Queued
}
}
};
if matches!(stage, ActionStage::Queued) {
// If the action is queued, we need to unset the worker id regardless of
// which worker sent the update.
awaited_action.set_worker_id(None);
} else {
awaited_action.set_worker_id(maybe_worker_id.copied());
}
awaited_action.set_state(Arc::new(ActionState {
stage,
id: operation_id.clone(),
}));
awaited_action.increment_version();

if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id.map_or_else(String::default, |v| v.to_string()),
..ExecutionMetadata::default()
},
error: Some(err.clone().merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed {}",
format!("for operation_id: {operation_id}, maybe_worker_id: {maybe_worker_id:?}"),
))),
..ActionResult::default()
})
let update_action_result = self
.action_db
.update_awaited_action(awaited_action)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation");
if let Err(err) = update_action_result {
// We use Aborted to signal that the action was not
// updated due to the data being set was not the latest
// but can be retried.
if err.code == Code::Aborted {
last_err = Some(err);
continue;
} else {
ActionStage::Queued
return Err(err);
}
}
};
if matches!(stage, ActionStage::Queued) {
// If the action is queued, we need to unset the worker id regardless of
// which worker sent the update.
awaited_action.set_worker_id(None);
} else {
awaited_action.set_worker_id(maybe_worker_id.copied());
}
awaited_action.set_state(Arc::new(ActionState {
stage,
id: operation_id.clone(),
}));
self.action_db
.update_awaited_action(awaited_action)
.await
.err_tip(|| "In MemorySchedulerStateManager::update_operation")?;

self.tasks_change_notify.notify_one();
Ok(())
self.tasks_change_notify.notify_one();
return Ok(());
}
match last_err {
Some(err) => Err(err),
None => Err(make_err!(
Code::Internal,
"Failed to update action after {} retries with no error set",
MAX_UPDATE_RETRIES,
)),
}
}

async fn inner_add_operation(
Expand Down