Skip to content

Commit

Permalink
Fix sequence bug in new memory store manager (#1162)
Browse files Browse the repository at this point in the history
Remove connected clients from awaited action.
  • Loading branch information
allada committed Jul 16, 2024
1 parent 6e50d2c commit 73c19c4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
4 changes: 0 additions & 4 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ pub struct AwaitedAction {

/// Number of attempts the job has been tried.
pub attempts: usize,

/// Number of clients listening to the state of the action.
pub connected_clients: usize,
}

impl AwaitedAction {
Expand All @@ -73,7 +70,6 @@ impl AwaitedAction {
sort_key,
attempts: 0,
last_worker_updated_timestamp: SystemTime::now(),
connected_clients: 1,
worker_id: None,
state,
}
Expand Down
54 changes: 35 additions & 19 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ pub struct AwaitedActionDbImpl {
/// See [`AwaitedActionSortKey`] for more information on the ordering.
sorted_action_info_hash_keys: SortedAwaitedActions,

/// The number of connected clients for each operation id.
connected_clients_for_operation_id: HashMap<OperationId, usize>,

action_event_tx: mpsc::UnboundedSender<ActionEvent>,
}

Expand Down Expand Up @@ -390,21 +393,30 @@ impl AwaitedActionDbImpl {
);
continue;
};
let mut connected_clients = 0;
// Note: We use this trick to modify the value, but we don't actually
// want to notify any listeners of the change.
tx.send_if_modified(|awaited_action| {
awaited_action.connected_clients -= 1;
connected_clients = awaited_action.connected_clients;
false
});
let connected_clients = match self
.connected_clients_for_operation_id
.remove(&operation_id)
{
Some(connected_clients) => connected_clients - 1,
None => {
event!(
Level::ERROR,
?operation_id,
"connected_clients_for_operation_id does not have operation_id"
);
0
}
};
// Note: It is rare to have more than one client listening
// to the same action, so we assume that we are the last
// client and insert it back into the map if we detect that
// there are still clients listening (ie: the happy path
// is operation.connected_clients == 0).
if connected_clients != 0 {
self.operation_id_to_awaited_action.insert(operation_id, tx);
self.operation_id_to_awaited_action
.insert(operation_id.clone(), tx);
self.connected_clients_for_operation_id
.insert(operation_id, connected_clients);
continue;
}
let awaited_action = tx.borrow().clone();
Expand Down Expand Up @@ -633,6 +645,8 @@ impl AwaitedActionDbImpl {
));
self.operation_id_to_awaited_action
.insert(operation_id.clone(), tx);
self.connected_clients_for_operation_id
.insert(operation_id.clone(), 1);
(client_awaited_action, rx)
}

Expand Down Expand Up @@ -662,10 +676,6 @@ impl AwaitedActionDbImpl {
};
let operation_id = OperationId::new(action_info.unique_qualifier.clone());
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
debug_assert!(
awaited_action.connected_clients == 1,
"Expected connected_clients to be 1"
);
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
"Expected action to be queued"
Expand Down Expand Up @@ -740,13 +750,18 @@ impl AwaitedActionDbImpl {
tx.borrow()
);

let maybe_connected_clients = self
.connected_clients_for_operation_id
.get_mut(operation_id);
let Some(connected_clients) = maybe_connected_clients else {
return Err(make_err!(
Code::Internal,
"connected_clients_for_operation_id and operation_id_to_awaited_action are out of sync for {unique_key:?} - {operation_id}"
));
};
*connected_clients += 1;

let subscription = tx.subscribe();
// Note: We use this trick to modify the value, but we don't actually
// want to notify any listeners of the change.
tx.send_if_modified(|awaited_action| {
awaited_action.connected_clients += 1;
false
});

self.client_operation_to_awaited_action
.insert(
Expand Down Expand Up @@ -778,6 +793,7 @@ impl MemoryAwaitedActionDb {
operation_id_to_awaited_action: BTreeMap::new(),
action_info_hash_key_to_awaited_action: HashMap::new(),
sorted_action_info_hash_keys: SortedAwaitedActions::default(),
connected_clients_for_operation_id: HashMap::new(),
action_event_tx,
}));
let weak_inner = Arc::downgrade(&inner);
Expand Down

0 comments on commit 73c19c4

Please sign in to comment.