Skip to content

Commit

Permalink
[Refactor] Move scheduler state behind mutex
Browse files Browse the repository at this point in the history
In prep to support a distributed/redis scheduler, prepare the state
interface to no longer take mutable references.

This is a partial PR and should be landed immediately with followup PRs
that will remove many of the locking in the SimpleScheduler.

towards: TraceMachina#359
  • Loading branch information
allada committed Jun 29, 2024
1 parent f5e7276 commit 7a16e2e
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 355 deletions.
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait ActionScheduler: Sync + Send + Unpin {
async fn find_existing_action(
&self,
unique_qualifier: &ActionInfoHashKey,
) -> Option<watch::Receiver<Arc<ActionState>>>;
) -> Result<Option<watch::Receiver<Arc<ActionState>>>, Error>;

/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ impl ActionScheduler for CacheLookupScheduler {
async fn find_existing_action(
&self,
unique_qualifier: &ActionInfoHashKey,
) -> Option<watch::Receiver<Arc<ActionState>>> {
) -> Result<Option<watch::Receiver<Arc<ActionState>>>, Error> {
{
let cache_check_actions = self.cache_check_actions.lock();
if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, unique_qualifier) {
return Some(rx);
return Ok(Some(rx));
}
}
// Cache skipped may be in the upstream scheduler.
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl ActionScheduler for GrpcScheduler {
async fn find_existing_action(
&self,
unique_qualifier: &ActionInfoHashKey,
) -> Option<watch::Receiver<Arc<ActionState>>> {
) -> Result<Option<watch::Receiver<Arc<ActionState>>>, Error> {
let request = WaitExecutionRequest {
name: unique_qualifier.action_name(),
};
Expand All @@ -279,14 +279,14 @@ impl ActionScheduler for GrpcScheduler {
.and_then(|result_stream| Self::stream_state(result_stream.into_inner()))
.await;
match result_stream {
Ok(result_stream) => Some(result_stream),
Ok(result_stream) => Ok(Some(result_stream)),
Err(err) => {
event!(
Level::WARN,
?err,
"Error looking up action with upstream scheduler"
);
None
Ok(None)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub type ActionStateResultStream = Pin<Box<dyn Stream<Item = Arc<dyn ActionState
pub trait ClientStateManager {
/// Add a new action to the queue or joins an existing action.
async fn add_action(
&mut self,
&self,
action_info: ActionInfo,
) -> Result<Arc<dyn ActionStateResult>, Error>;

Expand All @@ -119,7 +119,7 @@ pub trait WorkerStateManager {
/// did not change with a modified timestamp in order to prevent
/// the operation from being considered stale and being rescheduled.
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
Expand All @@ -136,7 +136,7 @@ pub trait MatchingEngineStateManager {

/// Update that state of an operation.
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: Option<WorkerId>,
action_stage: Result<ActionStage, Error>,
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ActionScheduler for PropertyModifierScheduler {
async fn find_existing_action(
&self,
unique_qualifier: &ActionInfoHashKey,
) -> Option<watch::Receiver<Arc<ActionState>>> {
) -> Result<Option<watch::Receiver<Arc<ActionState>>>, Error> {
self.scheduler.find_existing_action(unique_qualifier).await
}

Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/redis_operation_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> RedisStateManage
#[async_trait]
impl ClientStateManager for RedisStateManager {
async fn add_action(
&mut self,
&self,
action_info: ActionInfo,
) -> Result<Arc<dyn ActionStateResult>, Error> {
self.inner_add_action(action_info).await
Expand All @@ -430,7 +430,7 @@ impl ClientStateManager for RedisStateManager {
#[async_trait]
impl WorkerStateManager for RedisStateManager {
async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
Expand All @@ -450,7 +450,7 @@ impl MatchingEngineStateManager for RedisStateManager {
}

async fn update_operation(
&mut self,
&self,
operation_id: OperationId,
worker_id: Option<WorkerId>,
action_stage: Result<ActionStage, Error>,
Expand Down
Loading

0 comments on commit 7a16e2e

Please sign in to comment.