Skip to content

Commit

Permalink
StateManager will now cleanup actions on client disconnect (TraceMach…
Browse files Browse the repository at this point in the history
…ina#1107)

StateManager will now properly remove items from the maps if the
client disconnects after a set amount of time. Currently these
values are hard codded, but will be easy to transition them to
use config variables once we design it out.
  • Loading branch information
allada committed Jul 8, 2024
1 parent 6f8c001 commit e95adfc
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 121 deletions.
3 changes: 0 additions & 3 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ pub trait ActionScheduler: Sync + Send + Unpin {
client_operation_id: &ClientOperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;

/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);

/// Register the metrics for the action scheduler.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}
2 changes: 0 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,4 @@ impl ActionScheduler for CacheLookupScheduler {
.find_by_client_operation_id(client_operation_id)
.await
}

async fn clean_recently_completed_actions(&self) {}
}
19 changes: 0 additions & 19 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use nativelink_config::schedulers::SchedulerConfig;
use nativelink_error::{Error, ResultExt};
use nativelink_store::store_manager::StoreManager;
use nativelink_util::background_spawn;
use nativelink_util::metrics_utils::Registry;
use tokio::time::interval;

use crate::action_scheduler::ActionScheduler;
use crate::cache_lookup_scheduler::CacheLookupScheduler;
Expand Down Expand Up @@ -88,7 +85,6 @@ fn inner_scheduler_factory(

if let Some(scheduler_metrics) = maybe_scheduler_metrics {
if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
Expand All @@ -115,18 +111,3 @@ fn inner_scheduler_factory(

Ok(scheduler)
}

fn start_cleanup_timer(action_scheduler: &Arc<dyn ActionScheduler>) {
let weak_scheduler = Arc::downgrade(action_scheduler);
background_spawn!("default_scheduler_factory_cleanup_timer", async move {
let mut ticker = interval(Duration::from_secs(10));
loop {
ticker.tick().await;
match weak_scheduler.upgrade() {
Some(scheduler) => scheduler.clean_recently_completed_actions().await,
// If we fail to upgrade, our service is probably destroyed, so return.
None => return,
}
}
});
}
2 changes: 0 additions & 2 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,4 @@ impl ActionScheduler for GrpcScheduler {
}
}
}

async fn clean_recently_completed_actions(&self) {}
}
4 changes: 0 additions & 4 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl ActionScheduler for PropertyModifierScheduler {
.await
}

async fn clean_recently_completed_actions(&self) {
self.scheduler.clean_recently_completed_actions().await
}

// Register metrics for the underlying ActionScheduler.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let scheduler_registry = registry.sub_registry_with_prefix("property_modifier");
Expand Down
21 changes: 16 additions & 5 deletions nativelink-scheduler/src/scheduler_state/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct AwaitedAction {
/// The time the action was last updated.
last_worker_updated_timestamp: AtomicU64,

/// Number of clients listening to the state of the action.
listening_clients: AtomicUsize,

/// Worker that is currently running this action, None if unassigned.
worker_id: RwLock<Option<WorkerId>>,

Expand Down Expand Up @@ -118,6 +121,7 @@ impl AwaitedAction {
sort_info,
attempts: AtomicUsize::new(0),
last_worker_updated_timestamp: AtomicU64::new(SystemTime::now().unix_timestamp()),
listening_clients: AtomicUsize::new(0),
worker_id: RwLock::new(None),
},
sort_key,
Expand All @@ -139,6 +143,18 @@ impl AwaitedAction {
SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp)
}

pub fn get_listening_clients(&self) -> usize {
self.listening_clients.load(Ordering::Acquire)
}

pub fn inc_listening_clients(&self) {
self.listening_clients.fetch_add(1, Ordering::Release);
}

pub fn dec_listening_clients(&self) {
self.listening_clients.fetch_sub(1, Ordering::Release);
}

/// Updates the timestamp of the action.
fn update_worker_timestamp(&self) {
self.last_worker_updated_timestamp
Expand Down Expand Up @@ -192,11 +208,6 @@ impl AwaitedAction {
self.attempts.fetch_add(1, Ordering::Release);
}

// /// Subtracts one from the number of attempts the action has been tried.
// pub fn dec_attempts(&self) {
// self.attempts.fetch_sub(1, Ordering::Release);
// }

/// Gets the worker id that is currently processing this action.
pub fn get_worker_id(&self) -> Option<WorkerId> {
*self.worker_id.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,38 @@ use std::sync::Arc;

use async_trait::async_trait;
use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ActionState};
use nativelink_util::{
action_messages::{ActionInfo, ActionState},
task::JoinHandleDropGuard,
};
use tokio::sync::watch::Receiver;

use crate::operation_state_manager::ActionStateResult;

pub(crate) struct ClientActionStateResult {
/// The receiver for the action state updates.
rx: Receiver<Arc<ActionState>>,

/// Holds a handle to an optional spawn that will be automatically
/// canceled when this struct is dropped.
/// This is primarily used to keep the EvictionMap from dropping the
/// struct while a client is listening for updates.
_maybe_keepalive_spawn: Option<JoinHandleDropGuard<()>>,
}

impl ClientActionStateResult {
pub(crate) fn new(mut rx: Receiver<Arc<ActionState>>) -> Self {
pub(crate) fn new(
mut rx: Receiver<Arc<ActionState>>,
maybe_keepalive_spawn: Option<JoinHandleDropGuard<()>>,
) -> Self {
// Marking the initial value as changed for new or existing actions regardless if
// underlying state has changed. This allows for triggering notification after subscription
// without having to use an explicit notification.
rx.mark_changed();
Self { rx }
Self {
rx,
_maybe_keepalive_spawn: maybe_keepalive_spawn,
}
}
}

Expand Down
Loading

0 comments on commit e95adfc

Please sign in to comment.