Skip to content

Commit

Permalink
Remove all uses of salt and put under ActionUniqueQualifier
Browse files Browse the repository at this point in the history
We no longer need `salt` as part of the action key, because we now
put the key as an value in an enum in ActionUniqueQualifier.
  • Loading branch information
allada committed Jul 9, 2024
1 parent 44b087f commit 042cf97
Show file tree
Hide file tree
Showing 20 changed files with 424 additions and 585 deletions.
118 changes: 71 additions & 47 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionStage, ActionState, ClientOperationId, OperationId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier,
ClientOperationId, OperationId,
};
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::store_trait::{Store, StoreLike};
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
use scopeguard::guard;
use tokio::sync::oneshot;
Expand All @@ -44,7 +45,7 @@ use crate::platform_property_manager::PlatformPropertyManager;
/// being forwarded upstream. Missing the skip_cache_check actions which are
/// forwarded directly.
type CheckActions = HashMap<
ActionInfoHashKey,
ActionUniqueKey,
Vec<(
ClientOperationId,
oneshot::Sender<Result<Pin<Box<dyn ActionListener>>, Error>>,
Expand All @@ -67,7 +68,7 @@ async fn get_action_from_store(
action_digest: DigestInfo,
instance_name: String,
digest_function: DigestHasherFunc,
) -> Option<ProtoActionResult> {
) -> Result<ProtoActionResult, Error> {
// If we are a GrpcStore we shortcut here, as this is a special store.
if let Some(grpc_store) = ac_store.downcast_ref::<GrpcStore>(Some(action_digest.into())) {
let action_result_request = GetActionResultRequest {
Expand All @@ -82,11 +83,8 @@ async fn get_action_from_store(
.get_action_result(Request::new(action_result_request))
.await
.map(|response| response.into_inner())
.ok()
} else {
get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into())
.await
.ok()
get_and_decode_digest::<ProtoActionResult>(ac_store, action_digest.into()).await
}
}

Expand All @@ -95,7 +93,7 @@ type ActionListenerOneshot = oneshot::Receiver<Result<Pin<Box<dyn ActionListener

fn subscribe_to_existing_action(
inflight_cache_checks: &mut MutexGuard<CheckActions>,
unique_qualifier: &ActionInfoHashKey,
unique_qualifier: &ActionUniqueKey,
client_operation_id: &ClientOperationId,
) -> Option<ActionListenerOneshot> {
inflight_cache_checks
Expand Down Expand Up @@ -149,26 +147,29 @@ impl ActionScheduler for CacheLookupScheduler {
client_operation_id: ClientOperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
if action_info.skip_cache_lookup {
// Cache lookup skipped, forward to the upstream.
return self
.action_scheduler
.add_action(client_operation_id, action_info)
.await;
}
let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key.clone(),
ActionUniqueQualifier::Uncachable(_) => {
// Cache lookup skipped, forward to the upstream.
return self
.action_scheduler
.add_action(client_operation_id, action_info)
.await;
}
};

let cache_check_result = {
// Check this isn't a duplicate request first.
let mut inflight_cache_checks = self.inflight_cache_checks.lock();
let unique_qualifier = action_info.unique_qualifier.clone();
subscribe_to_existing_action(
&mut inflight_cache_checks,
&unique_qualifier,
&unique_key,
&client_operation_id,
)
.ok_or_else(move || {
let (action_listener_tx, action_listener_rx) = oneshot::channel();
inflight_cache_checks.insert(
unique_qualifier.clone(),
unique_key.clone(),
vec![(client_operation_id, action_listener_tx)],
);
// In the event we loose the reference to our `scope_guard`, it will remove
Expand All @@ -177,7 +178,7 @@ impl ActionScheduler for CacheLookupScheduler {
(
action_listener_rx,
guard((), move |_| {
inflight_cache_checks.lock().remove(&unique_qualifier);
inflight_cache_checks.lock().remove(&unique_key);
}),
)
})
Expand All @@ -203,54 +204,77 @@ impl ActionScheduler for CacheLookupScheduler {
// If our spawn ever dies, we will remove the action from the inflight_cache_checks map.
let _scope_guard = scope_guard;

let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key,
ActionUniqueQualifier::Uncachable(unique_key) => {
event!(
Level::ERROR,
?action_info,
"ActionInfo::unique_qualifier should be ActionUniqueQualifier::Cachable()"
);
unique_key
}
};

// Perform cache check.
let instance_name = action_info.unique_qualifier.instance_name.clone();
if let Some(action_result) = get_action_from_store(
let instance_name = action_info.unique_qualifier.instance_name().clone();
let maybe_action_result = get_action_from_store(
&ac_store,
action_info.unique_qualifier.digest,
action_info.unique_qualifier.digest(),
instance_name,
action_info.unique_qualifier.digest_function,
action_info.unique_qualifier.digest_function(),
)
.await
{
match ac_store.has(action_info.unique_qualifier.digest).await {
Ok(Some(_)) => {
.await;
match maybe_action_result {
Ok(action_result) => {
println!("{action_result:?}");
let maybe_pending_txs = {
let mut inflight_cache_checks = inflight_cache_checks.lock();
// We are ready to resolve the in-flight actions. We remove the
// in-flight actions from the map.
inflight_cache_checks.remove(unique_key)
};
let Some(pending_txs) = maybe_pending_txs else {
return; // Nobody is waiting for this action anymore.
};
let action_state = Arc::new(ActionState {
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::CompletedFromCache(action_result),
});
for (client_operation_id, pending_tx) in pending_txs {
// Ignore errors here, as the other end may have hung up.
let _ = pending_tx.send(Ok(Box::pin(CachedActionListener {
client_operation_id,
action_state: action_state.clone(),
})));
}
return;
}
Err(err) => {
// NotFound errors just mean we need to execute our action.
if err.code != Code::NotFound {
let err = err.append("In CacheLookupScheduler::add_action");
let maybe_pending_txs = {
let mut inflight_cache_checks = inflight_cache_checks.lock();
// We are ready to resolve the in-flight actions. We remove the
// in-flight actions from the map.
inflight_cache_checks.remove(&action_info.unique_qualifier)
inflight_cache_checks.remove(unique_key)
};
let Some(pending_txs) = maybe_pending_txs else {
return; // Nobody is waiting for this action anymore.
};
let action_state = Arc::new(ActionState {
id: OperationId::new(action_info.unique_qualifier.clone()),
stage: ActionStage::CompletedFromCache(action_result),
});
for (client_operation_id, pending_tx) in pending_txs {
for (_client_operation_id, pending_tx) in pending_txs {
// Ignore errors here, as the other end may have hung up.
let _ = pending_tx.send(Ok(Box::pin(CachedActionListener {
client_operation_id,
action_state: action_state.clone(),
})));
let _ = pending_tx.send(Err(err.clone()));
}
return;
}
Err(err) => {
event!(
Level::WARN,
?err,
"Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function"
);
}
_ => {}
}
}

let maybe_pending_txs = {
let mut inflight_cache_checks = inflight_cache_checks.lock();
inflight_cache_checks.remove(&action_info.unique_qualifier)
inflight_cache_checks.remove(unique_key)
};
let Some(pending_txs) = maybe_pending_txs else {
return; // Noone is waiting for this action anymore.
Expand Down
26 changes: 15 additions & 11 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionState, ClientOperationId, OperationId,
DEFAULT_EXECUTION_PRIORITY,
ActionInfo, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId,
OperationId, DEFAULT_EXECUTION_PRIORITY,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::connection_manager::ConnectionManager;
Expand Down Expand Up @@ -126,13 +126,13 @@ impl GrpcScheduler {
let client_operation_id =
ClientOperationId::from_raw_string(initial_response.name.clone());
// Our operation_id is not needed here is just a place holder to recycle existing object.
// The only thing that actually matters is the client_operation_id.
let operation_id = OperationId::new(ActionInfoHashKey {
instance_name: "dummy_instance_name".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::zero_digest(),
salt: 0,
});
// The only thing that actually matters is the operation_id.
let operation_id =
OperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey {
instance_name: "dummy_instance_name".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::zero_digest(),
}));
let action_state =
ActionState::try_from_operation(initial_response, operation_id.clone())
.err_tip(|| "In GrpcScheduler::stream_state")?;
Expand Down Expand Up @@ -250,16 +250,20 @@ impl ActionScheduler for GrpcScheduler {
priority: action_info.priority,
})
};
let skip_cache_lookup = match action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(_) => false,
ActionUniqueQualifier::Uncachable(_) => true,
};
let request = ExecuteRequest {
instance_name: action_info.instance_name().clone(),
skip_cache_lookup: action_info.skip_cache_lookup,
skip_cache_lookup,
action_digest: Some(action_info.digest().into()),
execution_policy,
// TODO: Get me from the original request, not very important as we ignore it.
results_cache_policy: None,
digest_function: action_info
.unique_qualifier
.digest_function
.digest_function()
.proto_digest_func()
.into(),
};
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use bitflags::bitflags;
use futures::Stream;
use nativelink_error::Error;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionStage, ActionState, ClientOperationId, OperationId,
WorkerId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId,
};
use nativelink_util::common::DigestInfo;
use tokio::sync::watch;
Expand Down Expand Up @@ -89,9 +88,10 @@ pub struct OperationFilter {

// /// The operation must have it's last client update before this time.
// NOTE: NOT PART OF ANY FILTERING!
// TODO!(cleanup)
// pub last_client_update_before: Option<SystemTime>,
/// The unique key for filtering specific action results.
pub unique_qualifier: Option<ActionInfoHashKey>,
pub unique_key: Option<ActionUniqueKey>,

/// If the results should be ordered by priority and in which direction.
pub order_by_priority_direction: Option<OrderDirection>,
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl ActionScheduler for PropertyModifierScheduler {
mut action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let platform_property_manager = self
.get_platform_property_manager(&action_info.unique_qualifier.instance_name)
.get_platform_property_manager(action_info.unique_qualifier.instance_name())
.await
.err_tip(|| "In PropertyModifierScheduler::add_action")?;
for modification in &self.modifications {
Expand Down
25 changes: 17 additions & 8 deletions nativelink-scheduler/src/scheduler_state/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionStage, ActionState, OperationId, WorkerId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
WorkerId,
};
use nativelink_util::evicting_map::InstantWrapper;
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -98,11 +99,15 @@ impl AwaitedAction {
AwaitedActionSortKey,
watch::Receiver<Arc<ActionState>>,
) {
let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key,
ActionUniqueQualifier::Uncachable(unique_key) => unique_key,
};
let stage = ActionStage::Queued;
let sort_key = AwaitedActionSortKey::new_with_action_hash(
let sort_key = AwaitedActionSortKey::new_with_unique_key(
action_info.priority,
&action_info.insert_timestamp,
&action_info.unique_qualifier,
unique_key,
);
let sort_info = RwLock::new(SortInfo {
priority: action_info.priority,
Expand Down Expand Up @@ -173,13 +178,17 @@ impl AwaitedAction {
if sort_info_lock.priority >= new_priority {
return None;
}
let unique_key = match &self.action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key,
ActionUniqueQualifier::Uncachable(unique_key) => unique_key,
};
let mut sort_info_lock = RwLockUpgradableReadGuard::upgrade(sort_info_lock);
let previous_sort_key = sort_info_lock.sort_key;
sort_info_lock.priority = new_priority;
sort_info_lock.sort_key = AwaitedActionSortKey::new_with_action_hash(
sort_info_lock.sort_key = AwaitedActionSortKey::new_with_unique_key(
new_priority,
&self.action_info.insert_timestamp,
&self.action_info.unique_qualifier,
unique_key,
);
Some(SortInfoLock {
previous_sort_key,
Expand Down Expand Up @@ -274,14 +283,14 @@ impl AwaitedActionSortKey {
]))
}

fn new_with_action_hash(
fn new_with_unique_key(
priority: i32,
insert_timestamp: &SystemTime,
action_hash: &ActionInfoHashKey,
action_hash: &ActionUniqueKey,
) -> Self {
let hash = {
let mut hasher = DefaultHasher::new();
ActionInfoHashKey::hash(action_hash, &mut hasher);
ActionUniqueKey::hash(action_hash, &mut hasher);
hasher.finish().to_le_bytes()[0..4].try_into().unwrap()
};
Self::new(priority, insert_timestamp.unix_timestamp(), hash)
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/scheduler_state/completed_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::SystemTime;

use nativelink_util::action_messages::{ActionInfoHashKey, ActionState, OperationId};
use nativelink_util::action_messages::{ActionState, ActionUniqueQualifier, OperationId};

/// A completed action that has no listeners.
pub struct CompletedAction {
Expand Down Expand Up @@ -49,9 +49,9 @@ impl Borrow<OperationId> for CompletedAction {
}
}

impl Borrow<ActionInfoHashKey> for CompletedAction {
impl Borrow<ActionUniqueQualifier> for CompletedAction {
#[inline]
fn borrow(&self) -> &ActionInfoHashKey {
fn borrow(&self) -> &ActionUniqueQualifier {
&self.state.id.unique_qualifier
}
}
Loading

0 comments on commit 042cf97

Please sign in to comment.