diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index 790e74c74..5fca3fdaf 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -25,12 +25,13 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::grpc_store::GrpcStore; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionStage, ActionState, ClientOperationId, OperationId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, + ClientOperationId, OperationId, }; use nativelink_util::background_spawn; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; -use nativelink_util::store_trait::{Store, StoreLike}; +use nativelink_util::store_trait::Store; use parking_lot::{Mutex, MutexGuard}; use scopeguard::guard; use tokio::sync::oneshot; @@ -44,7 +45,7 @@ use crate::platform_property_manager::PlatformPropertyManager; /// being forwarded upstream. Missing the skip_cache_check actions which are /// forwarded directly. type CheckActions = HashMap< - ActionInfoHashKey, + ActionUniqueKey, Vec<( ClientOperationId, oneshot::Sender>, Error>>, @@ -67,7 +68,7 @@ async fn get_action_from_store( action_digest: DigestInfo, instance_name: String, digest_function: DigestHasherFunc, -) -> Option { +) -> Result { // If we are a GrpcStore we shortcut here, as this is a special store. if let Some(grpc_store) = ac_store.downcast_ref::(Some(action_digest.into())) { let action_result_request = GetActionResultRequest { @@ -82,11 +83,8 @@ async fn get_action_from_store( .get_action_result(Request::new(action_result_request)) .await .map(|response| response.into_inner()) - .ok() } else { - get_and_decode_digest::(ac_store, action_digest.into()) - .await - .ok() + get_and_decode_digest::(ac_store, action_digest.into()).await } } @@ -95,7 +93,7 @@ type ActionListenerOneshot = oneshot::Receiver, - unique_qualifier: &ActionInfoHashKey, + unique_qualifier: &ActionUniqueKey, client_operation_id: &ClientOperationId, ) -> Option { inflight_cache_checks @@ -149,26 +147,29 @@ impl ActionScheduler for CacheLookupScheduler { client_operation_id: ClientOperationId, action_info: ActionInfo, ) -> Result>, Error> { - if action_info.skip_cache_lookup { - // Cache lookup skipped, forward to the upstream. - return self - .action_scheduler - .add_action(client_operation_id, action_info) - .await; - } + let unique_key = match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(), + ActionUniqueQualifier::Uncachable(_) => { + // Cache lookup skipped, forward to the upstream. + return self + .action_scheduler + .add_action(client_operation_id, action_info) + .await; + } + }; + let cache_check_result = { // Check this isn't a duplicate request first. let mut inflight_cache_checks = self.inflight_cache_checks.lock(); - let unique_qualifier = action_info.unique_qualifier.clone(); subscribe_to_existing_action( &mut inflight_cache_checks, - &unique_qualifier, + &unique_key, &client_operation_id, ) .ok_or_else(move || { let (action_listener_tx, action_listener_rx) = oneshot::channel(); inflight_cache_checks.insert( - unique_qualifier.clone(), + unique_key.clone(), vec![(client_operation_id, action_listener_tx)], ); // In the event we loose the reference to our `scope_guard`, it will remove @@ -177,7 +178,7 @@ impl ActionScheduler for CacheLookupScheduler { ( action_listener_rx, guard((), move |_| { - inflight_cache_checks.lock().remove(&unique_qualifier); + inflight_cache_checks.lock().remove(&unique_key); }), ) }) @@ -203,54 +204,77 @@ impl ActionScheduler for CacheLookupScheduler { // If our spawn ever dies, we will remove the action from the inflight_cache_checks map. let _scope_guard = scope_guard; + let unique_key = match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => unique_key, + ActionUniqueQualifier::Uncachable(unique_key) => { + event!( + Level::ERROR, + ?action_info, + "ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cachable()" + ); + unique_key + } + }; + // Perform cache check. - let instance_name = action_info.unique_qualifier.instance_name.clone(); - if let Some(action_result) = get_action_from_store( + let instance_name = action_info.unique_qualifier.instance_name().clone(); + let maybe_action_result = get_action_from_store( &ac_store, - action_info.unique_qualifier.digest, + action_info.unique_qualifier.digest(), instance_name, - action_info.unique_qualifier.digest_function, + action_info.unique_qualifier.digest_function(), ) - .await - { - match ac_store.has(action_info.unique_qualifier.digest).await { - Ok(Some(_)) => { + .await; + match maybe_action_result { + Ok(action_result) => { + println!("{action_result:?}"); + let maybe_pending_txs = { + let mut inflight_cache_checks = inflight_cache_checks.lock(); + // We are ready to resolve the in-flight actions. We remove the + // in-flight actions from the map. + inflight_cache_checks.remove(unique_key) + }; + let Some(pending_txs) = maybe_pending_txs else { + return; // Nobody is waiting for this action anymore. + }; + let action_state = Arc::new(ActionState { + id: OperationId::new(action_info.unique_qualifier.clone()), + stage: ActionStage::CompletedFromCache(action_result), + }); + for (client_operation_id, pending_tx) in pending_txs { + // Ignore errors here, as the other end may have hung up. + let _ = pending_tx.send(Ok(Box::pin(CachedActionListener { + client_operation_id, + action_state: action_state.clone(), + }))); + } + return; + } + Err(err) => { + // NotFound errors just mean we need to execute our action. + if err.code != Code::NotFound { + let err = err.append("In CacheLookupScheduler::add_action"); let maybe_pending_txs = { let mut inflight_cache_checks = inflight_cache_checks.lock(); // We are ready to resolve the in-flight actions. We remove the // in-flight actions from the map. - inflight_cache_checks.remove(&action_info.unique_qualifier) + inflight_cache_checks.remove(unique_key) }; let Some(pending_txs) = maybe_pending_txs else { return; // Nobody is waiting for this action anymore. }; - let action_state = Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), - stage: ActionStage::CompletedFromCache(action_result), - }); - for (client_operation_id, pending_tx) in pending_txs { + for (_client_operation_id, pending_tx) in pending_txs { // Ignore errors here, as the other end may have hung up. - let _ = pending_tx.send(Ok(Box::pin(CachedActionListener { - client_operation_id, - action_state: action_state.clone(), - }))); + let _ = pending_tx.send(Err(err.clone())); } return; } - Err(err) => { - event!( - Level::WARN, - ?err, - "Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function" - ); - } - _ => {} } } let maybe_pending_txs = { let mut inflight_cache_checks = inflight_cache_checks.lock(); - inflight_cache_checks.remove(&action_info.unique_qualifier) + inflight_cache_checks.remove(unique_key) }; let Some(pending_txs) = maybe_pending_txs else { return; // Noone is waiting for this action anymore. diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index e9c784fbd..8cc8b7775 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -29,8 +29,8 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_proto::google::longrunning::Operation; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionState, ClientOperationId, OperationId, - DEFAULT_EXECUTION_PRIORITY, + ActionInfo, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, + OperationId, DEFAULT_EXECUTION_PRIORITY, }; use nativelink_util::common::DigestInfo; use nativelink_util::connection_manager::ConnectionManager; @@ -126,13 +126,13 @@ impl GrpcScheduler { let client_operation_id = ClientOperationId::from_raw_string(initial_response.name.clone()); // Our operation_id is not needed here is just a place holder to recycle existing object. - // The only thing that actually matters is the client_operation_id. - let operation_id = OperationId::new(ActionInfoHashKey { - instance_name: "dummy_instance_name".to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::zero_digest(), - salt: 0, - }); + // The only thing that actually matters is the operation_id. + let operation_id = + OperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { + instance_name: "dummy_instance_name".to_string(), + digest_function: DigestHasherFunc::Sha256, + digest: DigestInfo::zero_digest(), + })); let action_state = ActionState::try_from_operation(initial_response, operation_id.clone()) .err_tip(|| "In GrpcScheduler::stream_state")?; @@ -250,16 +250,20 @@ impl ActionScheduler for GrpcScheduler { priority: action_info.priority, }) }; + let skip_cache_lookup = match action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(_) => false, + ActionUniqueQualifier::Uncachable(_) => true, + }; let request = ExecuteRequest { instance_name: action_info.instance_name().clone(), - skip_cache_lookup: action_info.skip_cache_lookup, + skip_cache_lookup, action_digest: Some(action_info.digest().into()), execution_policy, // TODO: Get me from the original request, not very important as we ignore it. results_cache_policy: None, digest_function: action_info .unique_qualifier - .digest_function + .digest_function() .proto_digest_func() .into(), }; diff --git a/nativelink-scheduler/src/operation_state_manager.rs b/nativelink-scheduler/src/operation_state_manager.rs index 4a28a4245..bee736810 100644 --- a/nativelink-scheduler/src/operation_state_manager.rs +++ b/nativelink-scheduler/src/operation_state_manager.rs @@ -22,8 +22,7 @@ use bitflags::bitflags; use futures::Stream; use nativelink_error::Error; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionStage, ActionState, ClientOperationId, OperationId, - WorkerId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId, }; use nativelink_util::common::DigestInfo; use tokio::sync::watch; @@ -89,9 +88,10 @@ pub struct OperationFilter { // /// The operation must have it's last client update before this time. // NOTE: NOT PART OF ANY FILTERING! + // TODO!(cleanup) // pub last_client_update_before: Option, /// The unique key for filtering specific action results. - pub unique_qualifier: Option, + pub unique_key: Option, /// If the results should be ordered by priority and in which direction. pub order_by_priority_direction: Option, diff --git a/nativelink-scheduler/src/property_modifier_scheduler.rs b/nativelink-scheduler/src/property_modifier_scheduler.rs index 08f852eac..c06bfc610 100644 --- a/nativelink-scheduler/src/property_modifier_scheduler.rs +++ b/nativelink-scheduler/src/property_modifier_scheduler.rs @@ -94,7 +94,7 @@ impl ActionScheduler for PropertyModifierScheduler { mut action_info: ActionInfo, ) -> Result>, Error> { let platform_property_manager = self - .get_platform_property_manager(&action_info.unique_qualifier.instance_name) + .get_platform_property_manager(action_info.unique_qualifier.instance_name()) .await .err_tip(|| "In PropertyModifierScheduler::add_action")?; for modification in &self.modifications { diff --git a/nativelink-scheduler/src/scheduler_state/awaited_action.rs b/nativelink-scheduler/src/scheduler_state/awaited_action.rs index 362591cc7..285f740d7 100644 --- a/nativelink-scheduler/src/scheduler_state/awaited_action.rs +++ b/nativelink-scheduler/src/scheduler_state/awaited_action.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionStage, ActionState, OperationId, WorkerId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId, + WorkerId, }; use nativelink_util::evicting_map::InstantWrapper; use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; @@ -98,11 +99,15 @@ impl AwaitedAction { AwaitedActionSortKey, watch::Receiver>, ) { + let unique_key = match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => unique_key, + ActionUniqueQualifier::Uncachable(unique_key) => unique_key, + }; let stage = ActionStage::Queued; - let sort_key = AwaitedActionSortKey::new_with_action_hash( + let sort_key = AwaitedActionSortKey::new_with_unique_key( action_info.priority, &action_info.insert_timestamp, - &action_info.unique_qualifier, + unique_key, ); let sort_info = RwLock::new(SortInfo { priority: action_info.priority, @@ -173,13 +178,17 @@ impl AwaitedAction { if sort_info_lock.priority >= new_priority { return None; } + let unique_key = match &self.action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => unique_key, + ActionUniqueQualifier::Uncachable(unique_key) => unique_key, + }; let mut sort_info_lock = RwLockUpgradableReadGuard::upgrade(sort_info_lock); let previous_sort_key = sort_info_lock.sort_key; sort_info_lock.priority = new_priority; - sort_info_lock.sort_key = AwaitedActionSortKey::new_with_action_hash( + sort_info_lock.sort_key = AwaitedActionSortKey::new_with_unique_key( new_priority, &self.action_info.insert_timestamp, - &self.action_info.unique_qualifier, + unique_key, ); Some(SortInfoLock { previous_sort_key, @@ -274,14 +283,14 @@ impl AwaitedActionSortKey { ])) } - fn new_with_action_hash( + fn new_with_unique_key( priority: i32, insert_timestamp: &SystemTime, - action_hash: &ActionInfoHashKey, + action_hash: &ActionUniqueKey, ) -> Self { let hash = { let mut hasher = DefaultHasher::new(); - ActionInfoHashKey::hash(action_hash, &mut hasher); + ActionUniqueKey::hash(action_hash, &mut hasher); hasher.finish().to_le_bytes()[0..4].try_into().unwrap() }; Self::new(priority, insert_timestamp.unix_timestamp(), hash) diff --git a/nativelink-scheduler/src/scheduler_state/completed_action.rs b/nativelink-scheduler/src/scheduler_state/completed_action.rs index 11b0854be..be926ec80 100644 --- a/nativelink-scheduler/src/scheduler_state/completed_action.rs +++ b/nativelink-scheduler/src/scheduler_state/completed_action.rs @@ -17,7 +17,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::SystemTime; -use nativelink_util::action_messages::{ActionInfoHashKey, ActionState, OperationId}; +use nativelink_util::action_messages::{ActionState, ActionUniqueQualifier, OperationId}; /// A completed action that has no listeners. pub struct CompletedAction { @@ -49,9 +49,9 @@ impl Borrow for CompletedAction { } } -impl Borrow for CompletedAction { +impl Borrow for CompletedAction { #[inline] - fn borrow(&self) -> &ActionInfoHashKey { + fn borrow(&self) -> &ActionUniqueQualifier { &self.state.id.unique_qualifier } } diff --git a/nativelink-scheduler/src/scheduler_state/state_manager.rs b/nativelink-scheduler/src/scheduler_state/state_manager.rs index 6e1b05b06..0a69c9fcb 100644 --- a/nativelink-scheduler/src/scheduler_state/state_manager.rs +++ b/nativelink-scheduler/src/scheduler_state/state_manager.rs @@ -25,8 +25,8 @@ use hashbrown::HashMap; use nativelink_config::stores::EvictionPolicy; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ClientOperationId, - ExecutionMetadata, OperationId, WorkerId, + ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, + ClientOperationId, ExecutionMetadata, OperationId, WorkerId, }; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; use nativelink_util::task::JoinHandleDropGuard; @@ -165,7 +165,7 @@ pub struct AwaitedActionDb { operation_id_to_awaited_action: HashMap>, /// A lookup table to lookup the state of an action by its unique qualifier. - action_info_hash_key_to_awaited_action: HashMap>, + action_info_hash_key_to_awaited_action: HashMap>, /// A sorted set of [`AwaitedAction`]s. A wrapper is used to perform sorting /// based on the [`AwaitedActionSortKey`] of the [`AwaitedAction`]. @@ -237,18 +237,26 @@ impl AwaitedActionDb { ); } - // Cleanup action_info_hash_key_to_awaited_action. + // Cleanup action_info_hash_key_to_awaited_action if it was marked cached. let action_info = awaited_action.get_action_info(); - let maybe_awaited_action = self - .action_info_hash_key_to_awaited_action - .remove(&action_info.unique_qualifier); - if maybe_awaited_action.is_none() { - event!( - Level::ERROR, - ?operation_id, - ?awaited_action, - "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", - ); + match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(action_key) => { + let maybe_awaited_action = self + .action_info_hash_key_to_awaited_action + .remove(action_key); + if maybe_awaited_action.is_none() { + event!( + Level::ERROR, + ?operation_id, + ?awaited_action, + ?action_key, + "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", + ); + } + } + ActionUniqueQualifier::Uncachable(_action_key) => { + // This Operation should not be in the hash_key map. + } } // Cleanup sorted_awaited_action. @@ -401,7 +409,6 @@ impl AwaitedActionDb { &client_operation_id, &action_info.unique_qualifier, action_info.priority, - action_info.skip_cache_lookup, ) .await; let action_info = match subscription_result { @@ -409,7 +416,10 @@ impl AwaitedActionDb { Err(_) => action_info, }; - let unique_qualifier = action_info.unique_qualifier.clone(); + let maybe_unique_key = match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => Some(unique_key.clone()), + ActionUniqueQualifier::Uncachable(_unique_key) => None, + }; let (awaited_action, sort_key, subscription) = AwaitedAction::new_with_subscription(action_info); let awaited_action = Arc::new(awaited_action); @@ -423,8 +433,11 @@ impl AwaitedActionDb { )), ) .await; - self.action_info_hash_key_to_awaited_action - .insert(unique_qualifier.clone(), awaited_action.clone()); + // Note: We only put items in the map that are cachable. + if let Some(unique_key) = maybe_unique_key { + self.action_info_hash_key_to_awaited_action + .insert(unique_key, awaited_action.clone()); + } self.operation_id_to_awaited_action .insert(awaited_action.get_operation_id(), awaited_action.clone()); @@ -442,23 +455,24 @@ impl AwaitedActionDb { &mut self, state_manager_impl: &Weak>, client_operation_id: &ClientOperationId, - unique_qualifier: &ActionInfoHashKey, + unique_qualifier: &ActionUniqueQualifier, priority: i32, - skip_cache_lookup: bool, ) -> Result>, Error> { - if skip_cache_lookup { - return Err(make_err!( - Code::InvalidArgument, - "Cannot subscribe to an existing item when skip_cache_lookup is true." - )); - } + let unique_key = match unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => unique_key, + ActionUniqueQualifier::Uncachable(_unique_key) => { + return Err(make_err!( + Code::InvalidArgument, + "Cannot subscribe to an existing item when skip_cache_lookup is true." + )); + } + }; let awaited_action = self .action_info_hash_key_to_awaited_action - .get(unique_qualifier) + .get(unique_key) .ok_or(make_input_err!( - "Could not find existing action with name: {}", - unique_qualifier.action_name() + "Could not find existing action with name: {unique_qualifier}" )) .err_tip(|| "In state_manager::try_subscribe")?; @@ -710,13 +724,20 @@ fn filter_check(awaited_action: &AwaitedAction, filter: &OperationFilter) -> boo { let action_info = awaited_action.get_action_info(); - if let Some(unique_qualifier) = &filter.unique_qualifier { - if unique_qualifier != &action_info.unique_qualifier { - return false; + if let Some(filter_unique_key) = &filter.unique_key { + match &action_info.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_key) => { + if filter_unique_key != unique_key { + return false; + } + } + ActionUniqueQualifier::Uncachable(_) => { + return false; + } } } if let Some(action_digest) = filter.action_digest { - if &action_digest != action_info.digest() { + if action_digest != action_info.digest() { return false; } } diff --git a/nativelink-scheduler/src/worker.rs b/nativelink-scheduler/src/worker.rs index 1b710ecd9..dc9879f93 100644 --- a/nativelink-scheduler/src/worker.rs +++ b/nativelink-scheduler/src/worker.rs @@ -291,7 +291,7 @@ impl MetricsComponent for Worker { vec![("worker_id".into(), format!("{}", self.id).into())], ); for action_info in self.running_action_infos.values() { - let action_name = action_info.unique_qualifier.action_name().to_string(); + let action_name = action_info.unique_qualifier.to_string(); c.publish_with_labels( "timeout", &action_info.timeout, @@ -316,12 +316,6 @@ impl MetricsComponent for Worker { "When this action was created.", vec![("digest".into(), action_name.clone().into())], ); - c.publish_with_labels( - "skip_cache_lookup", - &action_info.skip_cache_lookup, - "Weather this action should skip cache lookup.", - vec![("digest".into(), action_name.clone().into())], - ); } for (prop_name, prop_type_and_value) in &self.platform_properties.properties { match prop_type_and_value { diff --git a/nativelink-scheduler/tests/action_messages_test.rs b/nativelink-scheduler/tests/action_messages_test.rs index e7562f17e..f5cac5582 100644 --- a/nativelink-scheduler/tests/action_messages_test.rs +++ b/nativelink-scheduler/tests/action_messages_test.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::collections::HashMap; +use std::time::SystemTime; use nativelink_error::Error; use nativelink_macro::nativelink_test; @@ -22,30 +21,20 @@ use nativelink_proto::build::bazel::remote::execution::v2::ExecuteResponse; use nativelink_proto::google::longrunning::{operation, Operation}; use nativelink_proto::google::rpc::Status; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ClientOperationId, - ExecutionMetadata, OperationId, + ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, + ClientOperationId, ExecutionMetadata, OperationId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; -use nativelink_util::platform_properties::PlatformProperties; use pretty_assertions::assert_eq; -const NOW_TIME: u64 = 10000; - -fn make_system_time(add_time: u64) -> SystemTime { - SystemTime::UNIX_EPOCH - .checked_add(Duration::from_secs(NOW_TIME + add_time)) - .unwrap() -} - #[nativelink_test] async fn action_state_any_url_test() -> Result<(), Error> { - let unique_qualifier = ActionInfoHashKey { + let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: "foo_instance".to_string(), digest_function: DigestHasherFunc::Sha256, digest: DigestInfo::new([1u8; 32], 5), - salt: 0, - }; + }); let client_id = ClientOperationId::new(unique_qualifier.clone()); let operation_id = OperationId::new(unique_qualifier); let action_state = ActionState { @@ -102,115 +91,3 @@ async fn execute_response_status_message_is_some_on_success_test() -> Result<(), Ok(()) } - -#[nativelink_test] -async fn highest_priority_action_first() -> Result<(), Error> { - const INSTANCE_NAME: &str = "foobar_instance_name"; - - let high_priority_action = Arc::new(ActionInfo { - command_digest: DigestInfo::new([0u8; 32], 0), - input_root_digest: DigestInfo::new([0u8; 32], 0), - timeout: Duration::MAX, - platform_properties: PlatformProperties { - properties: HashMap::new(), - }, - priority: 1000, - load_timestamp: SystemTime::UNIX_EPOCH, - insert_timestamp: SystemTime::UNIX_EPOCH, - unique_qualifier: ActionInfoHashKey { - instance_name: INSTANCE_NAME.to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([0u8; 32], 0), - salt: 0, - }, - skip_cache_lookup: true, - }); - let lowest_priority_action = Arc::new(ActionInfo { - command_digest: DigestInfo::new([0u8; 32], 0), - input_root_digest: DigestInfo::new([0u8; 32], 0), - timeout: Duration::MAX, - platform_properties: PlatformProperties { - properties: HashMap::new(), - }, - priority: 0, - load_timestamp: SystemTime::UNIX_EPOCH, - insert_timestamp: SystemTime::UNIX_EPOCH, - unique_qualifier: ActionInfoHashKey { - instance_name: INSTANCE_NAME.to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([1u8; 32], 0), - salt: 0, - }, - skip_cache_lookup: true, - }); - let mut action_set = BTreeSet::>::new(); - action_set.insert(lowest_priority_action.clone()); - action_set.insert(high_priority_action.clone()); - - assert_eq!( - vec![high_priority_action, lowest_priority_action], - action_set - .iter() - .rev() - .cloned() - .collect::>>() - ); - - Ok(()) -} - -#[nativelink_test] -async fn equal_priority_earliest_first() -> Result<(), Error> { - const INSTANCE_NAME: &str = "foobar_instance_name"; - - let first_action = Arc::new(ActionInfo { - command_digest: DigestInfo::new([0u8; 32], 0), - input_root_digest: DigestInfo::new([0u8; 32], 0), - timeout: Duration::MAX, - platform_properties: PlatformProperties { - properties: HashMap::new(), - }, - priority: 0, - load_timestamp: SystemTime::UNIX_EPOCH, - insert_timestamp: SystemTime::UNIX_EPOCH, - unique_qualifier: ActionInfoHashKey { - instance_name: INSTANCE_NAME.to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([0u8; 32], 0), - salt: 0, - }, - skip_cache_lookup: true, - }); - let current_action = Arc::new(ActionInfo { - command_digest: DigestInfo::new([0u8; 32], 0), - input_root_digest: DigestInfo::new([0u8; 32], 0), - timeout: Duration::MAX, - platform_properties: PlatformProperties { - properties: HashMap::new(), - }, - priority: 0, - load_timestamp: SystemTime::UNIX_EPOCH, - insert_timestamp: make_system_time(0), - unique_qualifier: ActionInfoHashKey { - instance_name: INSTANCE_NAME.to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([1u8; 32], 0), - salt: 0, - }, - skip_cache_lookup: true, - }); - let mut action_set = BTreeSet::>::new(); - action_set.insert(current_action.clone()); - action_set.insert(first_action.clone()); - - assert_eq!( - vec![first_action, current_action], - action_set - .iter() - .rev() - .cloned() - .collect::>>() - ); - - Ok(()) -} diff --git a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs index 9599c0d2f..ad39312d3 100644 --- a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs +++ b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs @@ -31,7 +31,8 @@ use nativelink_scheduler::default_action_listener::DefaultActionListener; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; use nativelink_store::memory_store::MemoryStore; use nativelink_util::action_messages::{ - ActionInfoHashKey, ActionResult, ActionStage, ActionState, ClientOperationId, OperationId, + ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, + ClientOperationId, OperationId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -86,19 +87,22 @@ async fn platform_property_manager_call_passed() -> Result<(), Error> { #[nativelink_test] async fn add_action_handles_skip_cache() -> Result<(), Error> { let context = make_cache_scheduler()?; - let action_info = make_base_action_info(UNIX_EPOCH); + let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let action_result = ProtoActionResult::from(ActionResult::default()); context .ac_store - .update_oneshot(*action_info.digest(), action_result.encode_to_vec().into()) + .update_oneshot(action_info.digest(), action_result.encode_to_vec().into()) .await?; let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { id: OperationId::new(action_info.unique_qualifier.clone()), stage: ActionStage::Queued, })); + let ActionUniqueQualifier::Cachable(action_key) = action_info.unique_qualifier.clone() else { + panic!("This test should be testing when item was cached first"); + }; let mut skip_cache_action = action_info.clone(); - skip_cache_action.skip_cache_lookup = true; + skip_cache_action.unique_qualifier = ActionUniqueQualifier::Uncachable(action_key); let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); let _ = join!( context @@ -117,12 +121,12 @@ async fn add_action_handles_skip_cache() -> Result<(), Error> { #[nativelink_test] async fn find_by_client_operation_id_call_passed() -> Result<(), Error> { let context = make_cache_scheduler()?; - let client_operation_id = ClientOperationId::new(ActionInfoHashKey { - instance_name: "instance".to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([8; 32], 1), - salt: 1000, - }); + let client_operation_id = + ClientOperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { + instance_name: "instance".to_string(), + digest_function: DigestHasherFunc::Sha256, + digest: DigestInfo::new([8; 32], 1), + })); let (actual_result, actual_client_id) = join!( context .cache_scheduler diff --git a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs index c85b74347..56118504f 100644 --- a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs +++ b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs @@ -30,7 +30,8 @@ use nativelink_scheduler::default_action_listener::DefaultActionListener; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; use nativelink_scheduler::property_modifier_scheduler::PropertyModifierScheduler; use nativelink_util::action_messages::{ - ActionInfoHashKey, ActionStage, ActionState, ClientOperationId, OperationId, + ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, + OperationId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -69,7 +70,7 @@ async fn add_action_adds_property() -> Result<(), Error> { name: name.clone(), value: value.clone(), })]); - let action_info = make_base_action_info(UNIX_EPOCH); + let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { id: OperationId::new(action_info.unique_qualifier.clone()), @@ -112,7 +113,7 @@ async fn add_action_overwrites_property() -> Result<(), Error> { name: name.clone(), value: replaced_value.clone(), })]); - let mut action_info = make_base_action_info(UNIX_EPOCH); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); action_info .platform_properties .properties @@ -160,7 +161,7 @@ async fn add_action_property_added_after_remove() -> Result<(), Error> { value: value.clone(), }), ]); - let action_info = make_base_action_info(UNIX_EPOCH); + let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { id: OperationId::new(action_info.unique_qualifier.clone()), @@ -204,7 +205,7 @@ async fn add_action_property_remove_after_add() -> Result<(), Error> { }), PropertyModification::remove(name.clone()), ]); - let action_info = make_base_action_info(UNIX_EPOCH); + let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { id: OperationId::new(action_info.unique_qualifier.clone()), @@ -242,7 +243,7 @@ async fn add_action_property_remove() -> Result<(), Error> { let name = "name".to_string(); let value = "value".to_string(); let context = make_modifier_scheduler(vec![PropertyModification::remove(name.clone())]); - let mut action_info = make_base_action_info(UNIX_EPOCH); + let mut action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); action_info .platform_properties .properties @@ -279,12 +280,11 @@ async fn add_action_property_remove() -> Result<(), Error> { #[nativelink_test] async fn find_by_client_operation_id_call_passed() -> Result<(), Error> { let context = make_modifier_scheduler(vec![]); - let operation_id = ClientOperationId::new(ActionInfoHashKey { + let operation_id = ClientOperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { instance_name: "instance".to_string(), digest_function: DigestHasherFunc::Sha256, digest: DigestInfo::new([8; 32], 1), - salt: 1000, - }); + })); let (actual_result, actual_operation_id) = join!( context .modifier_scheduler diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index b0e14bfe8..8e4eb5339 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -31,9 +31,9 @@ use nativelink_scheduler::simple_scheduler::SimpleScheduler; use nativelink_scheduler::worker::Worker; use nativelink_scheduler::worker_scheduler::WorkerScheduler; use nativelink_util::action_messages::{ - ActionInfoHashKey, ActionResult, ActionStage, ActionState, ClientOperationId, DirectoryInfo, - ExecutionMetadata, FileInfo, NameOrPath, OperationId, SymlinkInfo, WorkerId, - INTERNAL_ERROR_EXIT_CODE, + ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, + ClientOperationId, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId, + SymlinkInfo, WorkerId, INTERNAL_ERROR_EXIT_CODE, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -132,9 +132,8 @@ async fn setup_action( platform_properties: PlatformProperties, insert_timestamp: SystemTime, ) -> Result>, Error> { - let mut action_info = make_base_action_info(insert_timestamp); + let mut action_info = make_base_action_info(insert_timestamp, action_digest); action_info.platform_properties = platform_properties; - action_info.unique_qualifier.digest = action_digest; let client_id = ClientOperationId::new(action_info.unique_qualifier.clone()); let result = scheduler.add_action(client_id, action_info).await; tokio::task::yield_now().await; // Allow task<->worker matcher to run. @@ -171,7 +170,6 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { update: Some(update_for_worker::Update::StartAction(StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -235,7 +233,6 @@ async fn find_executing_action() -> Result<(), Error> { update: Some(update_for_worker::Update::StartAction(StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -298,7 +295,6 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err let mut expected_start_execute_for_worker1 = StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest1.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -310,7 +306,6 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err let mut expected_start_execute_for_worker2 = StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest2.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -564,7 +559,6 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E update: Some(update_for_worker::Update::StartAction(StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -606,12 +600,11 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { ); let action_digest = DigestInfo::new([99u8; 32], 512); - let unique_qualifier = ActionInfoHashKey { + let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: "".to_string(), digest: DigestInfo::zero_digest(), digest_function: DigestHasherFunc::Sha256, - salt: 0, - }; + }); let id = OperationId::new(unique_qualifier); let mut expected_action_state = ActionState { id, @@ -654,7 +647,6 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { update: Some(update_for_worker::Update::StartAction(StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -771,7 +763,6 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { let mut start_execute = StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() @@ -897,12 +888,6 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err } }; - // let action_info_hash_key = ActionInfoHashKey { - // instance_name: INSTANCE_NAME.to_string(), - // digest_function: DigestHasherFunc::Sha256, - // digest: action_digest, - // salt: 0, - // }; let action_result = ActionResult { output_files: vec![FileInfo { name_or_path: NameOrPath::Name("hello".to_string()), @@ -1099,12 +1084,11 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { } let _ = setup_new_worker(&scheduler, rogue_worker_id, PlatformProperties::default()).await?; - let action_info_hash_key = ActionInfoHashKey { + let action_info_hash_key = ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: INSTANCE_NAME.to_string(), digest_function: DigestHasherFunc::Sha256, digest: action_digest, - salt: 0, - }; + }); let action_result = ActionResult { output_files: Vec::default(), output_folders: Vec::default(), @@ -1173,12 +1157,11 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro ); let action_digest = DigestInfo::new([99u8; 32], 512); - let unique_qualifier = ActionInfoHashKey { + let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: "".to_string(), digest: DigestInfo::zero_digest(), digest_function: DigestHasherFunc::Sha256, - salt: 0, - }; + }); let id = OperationId::new(unique_qualifier); let mut expected_action_state = ActionState { id, @@ -1202,7 +1185,6 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro update: Some(update_for_worker::Update::StartAction(StartExecute { execute_request: Some(ExecuteRequest { instance_name: INSTANCE_NAME.to_string(), - skip_cache_lookup: true, action_digest: Some(action_digest.into()), digest_function: digest_function::Value::Sha256.into(), ..Default::default() diff --git a/nativelink-scheduler/tests/utils/scheduler_utils.rs b/nativelink-scheduler/tests/utils/scheduler_utils.rs index b98c4cdf8..8d9635b9c 100644 --- a/nativelink-scheduler/tests/utils/scheduler_utils.rs +++ b/nativelink-scheduler/tests/utils/scheduler_utils.rs @@ -15,14 +15,17 @@ use std::collections::HashMap; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use nativelink_util::action_messages::{ActionInfo, ActionInfoHashKey}; +use nativelink_util::action_messages::{ActionInfo, ActionUniqueKey, ActionUniqueQualifier}; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; use nativelink_util::platform_properties::PlatformProperties; pub const INSTANCE_NAME: &str = "foobar_instance_name"; -pub fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo { +pub fn make_base_action_info( + insert_timestamp: SystemTime, + action_digest: DigestInfo, +) -> ActionInfo { ActionInfo { command_digest: DigestInfo::new([0u8; 32], 0), input_root_digest: DigestInfo::new([0u8; 32], 0), @@ -33,12 +36,10 @@ pub fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo { priority: 0, load_timestamp: UNIX_EPOCH, insert_timestamp, - unique_qualifier: ActionInfoHashKey { + unique_qualifier: ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: INSTANCE_NAME.to_string(), digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([0u8; 32], 0), - salt: 0, - }, - skip_cache_lookup: false, + digest: action_digest, + }), } } diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index e3a4aab44..b89d80583 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -32,7 +32,8 @@ use nativelink_scheduler::action_scheduler::{ActionListener, ActionScheduler}; use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ClientOperationId, DEFAULT_EXECUTION_PRIORITY, + ActionInfo, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, + DEFAULT_EXECUTION_PRIORITY, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{make_ctx_for_hash_func, DigestHasherFunc}; @@ -137,6 +138,17 @@ impl InstanceInfo { } } + let action_key = ActionUniqueKey { + instance_name, + digest_function, + digest: action_digest, + }; + let unique_qualifier = if skip_cache_lookup { + ActionUniqueQualifier::Uncachable(action_key) + } else { + ActionUniqueQualifier::Cachable(action_key) + }; + Ok(ActionInfo { command_digest, input_root_digest, @@ -145,18 +157,7 @@ impl InstanceInfo { priority, load_timestamp: UNIX_EPOCH, insert_timestamp: SystemTime::now(), - unique_qualifier: ActionInfoHashKey { - instance_name, - digest_function, - digest: action_digest, - salt: 0, // TODO(allada) This can be removed! - // if action.do_not_cache { - // thread_rng().gen::() - // } else { - // 0 - // }, - }, - skip_cache_lookup, + unique_qualifier, }) } } diff --git a/nativelink-service/tests/worker_api_server_test.rs b/nativelink-service/tests/worker_api_server_test.rs index 745158465..8ca1ec2cc 100644 --- a/nativelink-service/tests/worker_api_server_test.rs +++ b/nativelink-service/tests/worker_api_server_test.rs @@ -37,7 +37,7 @@ use nativelink_scheduler::scheduler_state::workers::ApiWorkerScheduler; use nativelink_scheduler::worker_scheduler::WorkerScheduler; use nativelink_service::worker_api_server::{ConnectWorkerStream, NowFn, WorkerApiServer}; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionStage, OperationId, WorkerId, + ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId, WorkerId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -374,12 +374,11 @@ pub async fn execution_response_success_test() -> Result<(), Box Result<(), Box Self { + pub fn new(unique_qualifier: ActionUniqueQualifier) -> Self { Self(OperationId::new(unique_qualifier).to_string()) } @@ -57,10 +58,6 @@ impl ClientOperationId { Self(name) } - pub fn from_operation(operation: &Operation) -> Self { - Self(operation.name.clone()) - } - pub fn into_string(self) -> String { self.0 } @@ -72,37 +69,46 @@ impl std::fmt::Display for ClientOperationId { } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct OperationId { - pub unique_qualifier: ActionInfoHashKey, + // TODO!(this should be for debugging only) + pub unique_qualifier: ActionUniqueQualifier, pub id: Uuid, } -// TODO: Eventually we should make this it's own hash rather than delegate to ActionInfoHashKey. +impl PartialEq for OperationId { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl Eq for OperationId {} + +impl PartialOrd for OperationId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OperationId { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} + impl Hash for OperationId { fn hash(&self, state: &mut H) { - ActionInfoHashKey::hash(&self.unique_qualifier, state) + self.id.hash(state) } } impl OperationId { - pub fn new(unique_qualifier: ActionInfoHashKey) -> Self { + pub fn new(unique_qualifier: ActionUniqueQualifier) -> Self { Self { id: uuid::Uuid::new_v4(), unique_qualifier, } } - - /// Utility function used to make a unique hash of the digest including the salt. - pub fn get_hash(&self) -> [u8; 32] { - self.unique_qualifier.get_hash() - } - - /// Returns the salt used for cache busting/hashing. - #[inline] - pub fn action_name(&self) -> String { - self.unique_qualifier.action_name() - } } impl TryFrom<&str> for OperationId { @@ -111,7 +117,7 @@ impl TryFrom<&str> for OperationId { /// Attempts to convert a string slice into an `OperationId`. /// /// The input string `value` is expected to be in the format: - /// `//-//`. + /// `//-//`. /// /// # Parameters /// @@ -146,30 +152,41 @@ impl TryFrom<&str> for OperationId { .err_tip(|| format!("Invalid OperationId unique_qualifier / id fragment - {value}"))?; let (instance_name, rest) = unique_qualifier .split_once('/') - .err_tip(|| format!("Invalid ActionInfoHashKey instance name fragment - {value}"))?; + .err_tip(|| format!("Invalid UniqueQualifier instance name fragment - {value}"))?; let (digest_function, rest) = rest .split_once('/') - .err_tip(|| format!("Invalid ActionInfoHashKey digest function fragment - {value}"))?; + .err_tip(|| format!("Invalid UniqueQualifier digest function fragment - {value}"))?; let (digest_hash, rest) = rest .split_once('-') - .err_tip(|| format!("Invalid ActionInfoHashKey digest hash fragment - {value}"))?; - let (digest_size, salt) = rest + .err_tip(|| format!("Invalid UniqueQualifier digest hash fragment - {value}"))?; + let (digest_size, cachable) = rest .split_once('/') - .err_tip(|| format!("Invalid ActionInfoHashKey digest size fragment - {value}"))?; + .err_tip(|| format!("Invalid UniqueQualifier digest size fragment - {value}"))?; let digest = DigestInfo::try_new( digest_hash, digest_size .parse::() - .err_tip(|| format!("Invalid ActionInfoHashKey size value fragment - {value}"))?, + .err_tip(|| format!("Invalid UniqueQualifier size value fragment - {value}"))?, ) .err_tip(|| format!("Invalid DigestInfo digest hash - {value}"))?; - let salt = u64::from_str_radix(salt, 16) - .err_tip(|| format!("Invalid ActionInfoHashKey salt hex conversion - {value}"))?; - let unique_qualifier = ActionInfoHashKey { + let cachable = match cachable { + "0" => false, + "1" => true, + _ => { + return Err(make_input_err!( + "Invalid UniqueQualifier cachable value fragment - {value}" + )); + } + }; + let unique_key = ActionUniqueKey { instance_name: instance_name.to_string(), digest_function: digest_function.try_into()?, digest, - salt, + }; + let unique_qualifier = if cachable { + ActionUniqueQualifier::Cachable(unique_key) + } else { + ActionUniqueQualifier::Uncachable(unique_key) }; let id = Uuid::parse_str(id).map_err(|e| make_input_err!("Failed to parse {e} as uuid"))?; Ok(Self { @@ -181,21 +198,13 @@ impl TryFrom<&str> for OperationId { impl std::fmt::Display for OperationId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "{}/{}", - self.unique_qualifier.action_name(), - self.id - )) + f.write_fmt(format_args!("{}/{}", self.unique_qualifier, self.id)) } } impl std::fmt::Debug for OperationId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "{}:{}", - self.unique_qualifier.action_name(), - self.id - )) + std::fmt::Display::fmt(&self, f) } } @@ -213,9 +222,7 @@ impl std::fmt::Display for WorkerId { impl std::fmt::Debug for WorkerId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut buf = Uuid::encode_buffer(); - let worker_id_str = self.0.hyphenated().encode_lower(&mut buf); - f.write_fmt(format_args!("{worker_id_str}")) + std::fmt::Display::fmt(&self, f) } } @@ -224,58 +231,77 @@ impl TryFrom for WorkerId { fn try_from(s: String) -> Result { match Uuid::parse_str(&s) { Err(e) => Err(make_input_err!( - "Failed to convert string to WorkerId : {} : {:?}", - s, - e + "Failed to convert string to WorkerId : {s} : {e:?}", )), Ok(my_uuid) => Ok(WorkerId(my_uuid)), } } } + +/// Holds the information needed to uniquely identify an action +/// and if it is cachable or not. +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub enum ActionUniqueQualifier { + /// The action is cachable. + Cachable(ActionUniqueKey), + /// The action is uncachable. + Uncachable(ActionUniqueKey), +} + +impl ActionUniqueQualifier { + /// Get the instance_name of the action. + pub const fn instance_name(&self) -> &String { + match self { + Self::Cachable(action) => &action.instance_name, + Self::Uncachable(action) => &action.instance_name, + } + } + + /// Get the digest function of the action. + pub const fn digest_function(&self) -> DigestHasherFunc { + match self { + Self::Cachable(action) => action.digest_function, + Self::Uncachable(action) => action.digest_function, + } + } + + /// Get the digest of the action. + pub const fn digest(&self) -> DigestInfo { + match self { + Self::Cachable(action) => action.digest, + Self::Uncachable(action) => action.digest, + } + } +} + +impl std::fmt::Display for ActionUniqueQualifier { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (cachable, unique_key) = match self { + Self::Cachable(action) => (true, action), + Self::Uncachable(action) => (false, action), + }; + f.write_fmt(format_args!( + "{}/{}/{}-{}/{}", + unique_key.instance_name, + unique_key.digest_function, + unique_key.digest.hash_str(), + unique_key.digest.size_bytes, + // TODO!(maybe make this a different value?) + cachable as i32, // Convert to 0 or 1. + )) + } +} + /// This is a utility struct used to make it easier to match `ActionInfos` in a /// `HashMap` without needing to construct an entire `ActionInfo`. -/// Since the hashing only needs the digest and salt we can just alias them here -/// and point the original `ActionInfo` structs to reference these structs for -/// it's hashing functions. -#[derive(Debug, Clone, PartialOrd, Ord, Serialize, Deserialize)] -pub struct ActionInfoHashKey { +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct ActionUniqueKey { /// Name of instance group this action belongs to. pub instance_name: String, /// The digest function this action expects. pub digest_function: DigestHasherFunc, /// Digest of the underlying `Action`. pub digest: DigestInfo, - /// Salt that can be filled with a random number to ensure no `ActionInfo` will be a match - /// to another `ActionInfo` in the scheduler. When caching is wanted this value is usually - /// zero. - pub salt: u64, -} - -impl ActionInfoHashKey { - /// Utility function used to make a unique hash of the digest including the salt. - pub fn get_hash(&self) -> [u8; 32] { - Blake3Hasher::new() - .update(self.instance_name.as_bytes()) - .update(&i32::from(self.digest_function.proto_digest_func()).to_le_bytes()) - .update(&self.digest.packed_hash[..]) - .update(&self.digest.size_bytes.to_le_bytes()) - .update(&self.salt.to_le_bytes()) - .finalize() - .into() - } - - /// Returns the salt used for cache busting/hashing. - #[inline] - pub fn action_name(&self) -> String { - format!( - "{}/{}/{}-{}/{:X}", - self.instance_name, - self.digest_function, - self.digest.hash_str(), - self.digest.size_bytes, - self.salt - ) - } } /// Information needed to execute an action. This struct is used over bazel's proto `Action` @@ -299,47 +325,43 @@ pub struct ActionInfo { pub load_timestamp: SystemTime, /// When this action was created. pub insert_timestamp: SystemTime, - - /// Info used to uniquely identify this ActionInfo. Normally the hash function would just - /// use the fields it needs and you wouldn't need to separate them, however we have a use - /// case where we sometimes want to lookup an entry in a HashMap, but we don't have the - /// info to construct an entire ActionInfo. In such case we construct only a ActionInfoHashKey - /// then use that object to lookup the entry in the map. The root problem is that HashMap - /// requires `ActionInfo :Borrow` in order for this to work, which means - /// we need to be able to return a &ActionInfoHashKey from ActionInfo, but since we cannot - /// return a temporary reference we must have an object tied to ActionInfo's lifetime and - /// return it's reference. - pub unique_qualifier: ActionInfoHashKey, - - /// Whether to try looking up this action in the cache. - pub skip_cache_lookup: bool, + /// Info used to uniquely identify this ActionInfo and if it is cachable. + /// This is primarily used to join actions/operations together using this key. + pub unique_qualifier: ActionUniqueQualifier, } impl ActionInfo { #[inline] pub const fn instance_name(&self) -> &String { - &self.unique_qualifier.instance_name + self.unique_qualifier.instance_name() } /// Returns the underlying digest of the `Action`. #[inline] - pub const fn digest(&self) -> &DigestInfo { - &self.unique_qualifier.digest + pub const fn digest(&self) -> DigestInfo { + self.unique_qualifier.digest() } - /// Returns the salt used for cache busting/hashing. - #[inline] - pub const fn salt(&self) -> &u64 { - &self.unique_qualifier.salt - } - - pub fn try_from_action_and_execute_request_with_salt( + pub fn try_from_action_and_execute_request( execute_request: ExecuteRequest, action: Action, - salt: u64, load_timestamp: SystemTime, queued_timestamp: SystemTime, ) -> Result { + let unique_key = ActionUniqueKey { + instance_name: execute_request.instance_name, + digest_function: DigestHasherFunc::try_from(execute_request.digest_function) + .err_tip(|| format!("Could not find digest_function in try_from_action_and_execute_request {:?}", execute_request.digest_function))?, + digest: execute_request + .action_digest + .err_tip(|| "Expected action_digest to exist on ExecuteRequest")? + .try_into()?, + }; + let unique_qualifier = if execute_request.skip_cache_lookup { + ActionUniqueQualifier::Uncachable(unique_key) + } else { + ActionUniqueQualifier::Cachable(unique_key) + }; Ok(Self { command_digest: action .command_digest @@ -355,20 +377,13 @@ impl ActionInfo { .try_into() .map_err(|_| make_input_err!("Failed convert proto duration to system duration"))?, platform_properties: action.platform.unwrap_or_default().into(), - priority: execute_request.execution_policy.unwrap_or_default().priority, + priority: execute_request + .execution_policy + .unwrap_or_default() + .priority, load_timestamp, insert_timestamp: queued_timestamp, - unique_qualifier: ActionInfoHashKey { - instance_name: execute_request.instance_name, - digest_function: DigestHasherFunc::try_from(execute_request.digest_function) - .err_tip(|| format!("Could not find digest_function in try_from_action_and_execute_request_with_salt {:?}", execute_request.digest_function))?, - digest: execute_request - .action_digest - .err_tip(|| "Expected action_digest to exist on ExecuteRequest")? - .try_into()?, - salt, - }, - skip_cache_lookup: execute_request.skip_cache_lookup, + unique_qualifier, }) } } @@ -376,92 +391,21 @@ impl ActionInfo { impl From for ExecuteRequest { fn from(val: ActionInfo) -> Self { let digest = val.digest().into(); + let (skip_cache_lookup, unique_qualifier) = match val.unique_qualifier { + ActionUniqueQualifier::Cachable(unique_qualifier) => (false, unique_qualifier), + ActionUniqueQualifier::Uncachable(unique_qualifier) => (true, unique_qualifier), + }; Self { - instance_name: val.unique_qualifier.instance_name, + instance_name: unique_qualifier.instance_name, action_digest: Some(digest), - skip_cache_lookup: true, // The worker should never cache lookup. - execution_policy: None, // Not used in the worker. + skip_cache_lookup, + execution_policy: None, // Not used in the worker. results_cache_policy: None, // Not used in the worker. - digest_function: val - .unique_qualifier - .digest_function - .proto_digest_func() - .into(), + digest_function: unique_qualifier.digest_function.proto_digest_func().into(), } } } -// Note: Hashing, Eq, and Ord matching on this struct is unique. Normally these functions -// must play well with each other, but in our case the following rules apply: -// * Hash - Hashing must be unique on the exact command being run and must never match -// when do_not_cache is enabled, but must be consistent between identical data -// hashes. -// * Eq - Same as hash. -// * Ord - Used when sorting `ActionInfo` together. The only major sorting is priority and -// insert_timestamp, everything else is undefined, but must be deterministic. -impl Hash for ActionInfo { - fn hash(&self, state: &mut H) { - ActionInfoHashKey::hash(&self.unique_qualifier, state); - } -} - -impl PartialEq for ActionInfo { - fn eq(&self, other: &Self) -> bool { - ActionInfoHashKey::eq(&self.unique_qualifier, &other.unique_qualifier) - } -} - -impl Eq for ActionInfo {} - -impl Ord for ActionInfo { - fn cmp(&self, other: &Self) -> Ordering { - // Want the highest priority on top, but the lowest insert_timestamp. - self.priority - .cmp(&other.priority) - .then_with(|| other.insert_timestamp.cmp(&self.insert_timestamp)) - .then_with(|| self.salt().cmp(other.salt())) - .then_with(|| self.digest().size_bytes.cmp(&other.digest().size_bytes)) - .then_with(|| self.digest().packed_hash.cmp(&other.digest().packed_hash)) - .then_with(|| { - self.unique_qualifier - .digest_function - .cmp(&other.unique_qualifier.digest_function) - }) - } -} - -impl PartialOrd for ActionInfo { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Borrow for Arc { - #[inline] - fn borrow(&self) -> &ActionInfoHashKey { - &self.unique_qualifier - } -} - -impl Hash for ActionInfoHashKey { - fn hash(&self, state: &mut H) { - // Digest is unique, so hashing it is all we need. - self.digest_function.hash(state); - self.digest.hash(state); - self.salt.hash(state); - } -} - -impl PartialEq for ActionInfoHashKey { - fn eq(&self, other: &Self) -> bool { - self.digest == other.digest - && self.salt == other.salt - && self.digest_function == other.digest_function - } -} - -impl Eq for ActionInfoHashKey {} - /// Simple utility struct to determine if a string is representing a full path or /// just the name of the file. /// This is in order to be able to reuse the same struct instead of building different @@ -755,9 +699,6 @@ impl TryFrom for ExecutionMetadata { } } -/// Exit code sent if there is an internal error. -pub const INTERNAL_ERROR_EXIT_CODE: i32 = -178; - /// Represents the results of an execution. /// This struct must be 100% compatible with `ActionResult` in `remote_execution.proto`. #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] @@ -1143,15 +1084,6 @@ pub struct ActionState { } impl ActionState { - #[inline] - pub fn unique_qualifier(&self) -> &ActionInfoHashKey { - &self.id.unique_qualifier - } - #[inline] - pub fn action_digest(&self) -> &DigestInfo { - &self.id.unique_qualifier.digest - } - pub fn try_from_operation( operation: Operation, operation_id: OperationId, @@ -1210,7 +1142,7 @@ impl ActionState { } else { None }; - let digest = Some(self.id.unique_qualifier.digest.into()); + let digest = Some(self.id.unique_qualifier.digest().into()); let metadata = ExecuteOperationMetadata { stage, diff --git a/nativelink-util/tests/operation_id_tests.rs b/nativelink-util/tests/operation_id_tests.rs index e1e8b5e30..c5afba440 100644 --- a/nativelink-util/tests/operation_id_tests.rs +++ b/nativelink-util/tests/operation_id_tests.rs @@ -19,22 +19,28 @@ use pretty_assertions::assert_eq; #[nativelink_test] async fn parse_operation_id() -> Result<(), Error> { - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap(); - assert_eq!( - operation_id.to_string(), - "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - assert_eq!( - operation_id.action_name(), - "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0" - ); - assert_eq!( - operation_id.id.to_string(), - "19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" - ); - assert_eq!( - hex::encode(operation_id.get_hash()), - "5a36f0db39e27667c4b91937cd29c1df8799ba468f2de6810c6865be05517644" - ); + { + // Check no cached. + let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap(); + assert_eq!( + operation_id.to_string(), + "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!( + operation_id.id.to_string(), + "19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" + ); + } + { + // Check cached. + let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/1/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap(); + assert_eq!( + operation_id.to_string(), + "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/1/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!( + operation_id.id.to_string(), + "19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" + ); + } Ok(()) } @@ -53,7 +59,7 @@ async fn parse_empty_failure() -> Result<(), Error> { assert_eq!(operation_id.messages.len(), 1); assert_eq!( operation_id.messages[0], - "Invalid ActionInfoHashKey instance name fragment - /" + "Invalid UniqueQualifier instance name fragment - /" ); let operation_id = OperationId::try_from("main").err().unwrap(); @@ -92,25 +98,23 @@ async fn parse_empty_failure() -> Result<(), Error> { operation_id.messages[0], "cannot parse integer from empty string" ); - assert_eq!(operation_id.messages[1], "Invalid ActionInfoHashKey size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!(operation_id.messages[1], "Invalid UniqueQualifier size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f--211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); assert_eq!(operation_id.code, Code::InvalidArgument); assert_eq!(operation_id.messages.len(), 2); assert_eq!(operation_id.messages[0], "invalid digit found in string"); - assert_eq!(operation_id.messages[1], "Invalid ActionInfoHashKey size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f--211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!(operation_id.messages[1], "Invalid UniqueQualifier size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f--211/0/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/x/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.messages.len(), 2); + assert_eq!(operation_id.messages.len(), 1); assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "invalid digit found in string"); - assert_eq!(operation_id.messages[1], "Invalid ActionInfoHashKey salt hex conversion - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/x/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier cachable value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/x/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/-10/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.messages.len(), 2); + assert_eq!(operation_id.messages.len(), 1); assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "invalid digit found in string"); - assert_eq!(operation_id.messages[1], "Invalid ActionInfoHashKey salt hex conversion - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/-10/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier cachable value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/-10/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0/baduuid").err().unwrap(); assert_eq!(operation_id.messages.len(), 1); @@ -124,7 +128,7 @@ async fn parse_empty_failure() -> Result<(), Error> { .unwrap(); assert_eq!(operation_id.messages.len(), 1); assert_eq!(operation_id.code, Code::Internal); - assert_eq!(operation_id.messages[0], "Invalid ActionInfoHashKey digest size fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0"); + assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier digest size fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0"); Ok(()) } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index a3c979b2f..9e53df033 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1033,7 +1033,7 @@ impl RunningActionImpl { ) }; let cas_store = self.running_actions_manager.cas_store.as_ref(); - let hasher = self.action_info.unique_qualifier.digest_function; + let hasher = self.action_info.unique_qualifier.digest_function(); enum OutputType { None, File(FileInfo), @@ -1731,10 +1731,9 @@ impl RunningActionsManagerImpl { get_and_decode_digest::(self.cas_store.as_ref(), action_digest.into()) .await .err_tip(|| "During start_action")?; - let action_info = ActionInfo::try_from_action_and_execute_request_with_salt( + let action_info = ActionInfo::try_from_action_and_execute_request( execute_request, action, - 0, // TODO: salt is no longer needed. load_start_timestamp, queued_timestamp, ) diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 321f853f5..673579b75 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -42,7 +42,8 @@ use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; use nativelink_store::memory_store::MemoryStore; use nativelink_util::action_messages::{ - ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata, OperationId, + ActionInfo, ActionResult, ActionStage, ActionUniqueKey, ActionUniqueQualifier, + ExecutionMetadata, OperationId, }; use nativelink_util::common::{encode_stream_proto, fs, DigestInfo}; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -230,13 +231,11 @@ async fn blake3_digest_function_registerd_properly() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { - const SALT: u64 = 1000; - let mut test_context = setup_local_worker(HashMap::new()).await; let streaming_response = test_context.maybe_streaming_response.take().unwrap(); @@ -667,13 +660,11 @@ async fn kill_action_request_kills_action() -> Result<(), Box SystemTime { } fn make_operation_id(execute_request: &ExecuteRequest) -> OperationId { - let unique_qualifier = ActionInfoHashKey { + let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { instance_name: execute_request.instance_name.clone(), digest_function: execute_request.digest_function.try_into().unwrap(), digest: execute_request @@ -152,8 +151,7 @@ fn make_operation_id(execute_request: &ExecuteRequest) -> OperationId { .unwrap() .try_into() .unwrap(), - salt: 0, - }; + }); OperationId::new(unique_qualifier) }