Skip to content

Commit

Permalink
key as u64 (TraceMachina#1166)
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 16, 2024
1 parent 0192051 commit 9c2c7b9
Showing 1 changed file with 24 additions and 55 deletions.
79 changes: 24 additions & 55 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{SystemTime, UNIX_EPOCH};

use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
ActionInfo, ActionStage, ActionState, OperationId,
WorkerId,
};
use nativelink_util::evicting_map::InstantWrapper;
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};

/// The version of the awaited action.
Expand Down Expand Up @@ -59,15 +57,10 @@ pub struct AwaitedAction {

impl AwaitedAction {
pub fn new(operation_id: OperationId, action_info: Arc<ActionInfo>) -> Self {
let unique_key = match &action_info.unique_qualifier {
ActionUniqueQualifier::Cachable(unique_key) => unique_key,
ActionUniqueQualifier::Uncachable(unique_key) => unique_key,
};
let stage = ActionStage::Queued;
let sort_key = AwaitedActionSortKey::new_with_unique_key(
action_info.priority,
&action_info.insert_timestamp,
unique_key,
);
let state = Arc::new(ActionState {
stage,
Expand Down Expand Up @@ -141,11 +134,11 @@ impl AwaitedAction {
/// 3. (mostly random hash based on the action info)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
pub struct AwaitedActionSortKey(u128);
pub struct AwaitedActionSortKey(u64);

impl AwaitedActionSortKey {
#[rustfmt::skip]
const fn new(priority: i32, insert_timestamp: u64, hash: [u8; 4]) -> Self {
const fn new(priority: i32, insert_timestamp: u32) -> Self {
// Shift `new_priority` so [`i32::MIN`] is represented by zero.
// This makes it so any nagative values are positive, but
// maintains ordering.
Expand All @@ -154,85 +147,61 @@ impl AwaitedActionSortKey {

// Invert our timestamp so the larger the timestamp the lower the number.
// This makes timestamp descending order instead of ascending.
let timestamp = (insert_timestamp ^ u64::MAX).to_be_bytes();
let timestamp = (insert_timestamp ^ u32::MAX).to_be_bytes();

AwaitedActionSortKey(u128::from_be_bytes([
AwaitedActionSortKey(u64::from_be_bytes([
priority[0], priority[1], priority[2], priority[3],
timestamp[0], timestamp[1], timestamp[2], timestamp[3],
timestamp[4], timestamp[5], timestamp[6], timestamp[7],
hash[0], hash[1], hash[2], hash[3]
]))
}

fn new_with_unique_key(
priority: i32,
insert_timestamp: &SystemTime,
action_hash: &ActionUniqueKey,
) -> Self {
let hash = {
let mut hasher = DefaultHasher::new();
ActionUniqueKey::hash(action_hash, &mut hasher);
hasher.finish().to_le_bytes()[0..4].try_into().unwrap()
};
Self::new(priority, insert_timestamp.unix_timestamp(), hash)
let timestamp = insert_timestamp.duration_since(UNIX_EPOCH).unwrap().as_secs() as u32;
Self::new(priority, timestamp)
}
}

// Ensure the size of the sort key is the same as a `u64`.
assert_eq_size!(AwaitedActionSortKey, u128);
assert_eq_size!(AwaitedActionSortKey, u64);

const_assert_eq!(
AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0_1234_5678, [0x9a, 0xbc, 0xde, 0xf0]).0,
AwaitedActionSortKey::new(0x1234_5678, 0x9abc_def0).0,
// Note: Result has 0x12345678 + 0x80000000 = 0x92345678 because we need
// to shift the `i32::MIN` value to be represented by zero.
// Note: `6543210fedcba987` are the inverted bits of `9abcdef012345678`.
// Note: `6543210f` are the inverted bits of `9abcdef0`.
// This effectively inverts the priority to now have the highest priority
// be the lowest timestamps.
AwaitedActionSortKey(0x9234_5678_6543_210f_edcb_a987_9abc_def0).0
AwaitedActionSortKey(0x9234_5678_6543_210f).0
);
// Ensure the priority is used as the sort key first.
const_assert!(
AwaitedActionSortKey::new(i32::MAX, 0, [0xff; 4]).0
> AwaitedActionSortKey::new(i32::MAX - 1, 0, [0; 4]).0
AwaitedActionSortKey::new(i32::MAX, 0).0
> AwaitedActionSortKey::new(i32::MAX - 1, 0).0
);
const_assert!(
AwaitedActionSortKey::new(i32::MAX - 1, 0, [0xff; 4]).0
> AwaitedActionSortKey::new(1, 0, [0; 4]).0
AwaitedActionSortKey::new(i32::MAX - 1, 0).0
> AwaitedActionSortKey::new(1, 0).0
);
const_assert!(
AwaitedActionSortKey::new(1, 0, [0xff; 4]).0 > AwaitedActionSortKey::new(0, 0, [0; 4]).0
AwaitedActionSortKey::new(1, 0).0 > AwaitedActionSortKey::new(0, 0).0
);
const_assert!(
AwaitedActionSortKey::new(0, 0, [0xff; 4]).0 > AwaitedActionSortKey::new(-1, 0, [0; 4]).0
AwaitedActionSortKey::new(0, 0).0 > AwaitedActionSortKey::new(-1, 0).0
);
const_assert!(
AwaitedActionSortKey::new(-1, 0, [0xff; 4]).0
> AwaitedActionSortKey::new(i32::MIN + 1, 0, [0; 4]).0
AwaitedActionSortKey::new(-1, 0).0
> AwaitedActionSortKey::new(i32::MIN + 1, 0).0
);
const_assert!(
AwaitedActionSortKey::new(i32::MIN + 1, 0, [0xff; 4]).0
> AwaitedActionSortKey::new(i32::MIN, 0, [0; 4]).0
AwaitedActionSortKey::new(i32::MIN + 1, 0).0
> AwaitedActionSortKey::new(i32::MIN, 0).0
);

// Ensure the insert timestamp is used as the sort key second.
const_assert!(
AwaitedActionSortKey::new(0, u64::MIN, [0; 4]).0
> AwaitedActionSortKey::new(0, u64::MAX, [0; 4]).0
);

// Ensure the hash is used as the sort key third.
const_assert!(
AwaitedActionSortKey::new(0, 0, [0xff, 0xff, 0xff, 0xff]).0
> AwaitedActionSortKey::new(0, 0, [0; 4]).0
);
const_assert!(
AwaitedActionSortKey::new(1, 0, [0xff, 0xff, 0xff, 0xff]).0
> AwaitedActionSortKey::new(0, 0, [0; 4]).0
);
const_assert!(
AwaitedActionSortKey::new(0, 0, [0; 4]).0 > AwaitedActionSortKey::new(0, 1, [0; 4]).0
);
const_assert!(
AwaitedActionSortKey::new(0, 0, [0xff, 0xff, 0xff, 0xff]).0
> AwaitedActionSortKey::new(0, 0, [0; 4]).0
AwaitedActionSortKey::new(0, u32::MIN).0
> AwaitedActionSortKey::new(0, u32::MAX).0
);

0 comments on commit 9c2c7b9

Please sign in to comment.