Skip to content

Commit

Permalink
StateManager can now be notified of noone listeneing (#1093)
Browse files Browse the repository at this point in the history
ActionStateResult is now wired up to ActionListener allowing
it to be notified of Drop calls. This will be used to do client
operation id cleanups.
  • Loading branch information
allada committed Jul 8, 2024
1 parent cfc0cf6 commit 0d93671
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 49 deletions.
5 changes: 1 addition & 4 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ pub trait ActionListener: Sync + Send + Unpin {
/// Returns the client operation id.
fn client_operation_id(&self) -> &ClientOperationId;

/// Returns the current action state.
fn action_state(&self) -> Arc<ActionState>;

/// Waits for the action state to change.
fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + Sync + '_>>;
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>>;
}

/// ActionScheduler interface is responsible for interactions between the scheduler
Expand Down
6 changes: 1 addition & 5 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,9 @@ impl ActionListener for CachedActionListener {
&self.client_operation_id
}

fn action_state(&self) -> Arc<ActionState> {
self.action_state.clone()
}

fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + Sync + '_>> {
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>> {
Box::pin(async { Ok(self.action_state.clone()) })
}
}
Expand Down
30 changes: 14 additions & 16 deletions nativelink-scheduler/src/default_action_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,29 @@ impl DefaultActionListener {
action_state,
}
}

pub async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
self.action_state.changed().await.map_or_else(
|e| {
Err(make_err!(
Code::Internal,
"Sender of ActionState went away unexpectedly - {e:?}"
))
},
|()| Ok(self.action_state.borrow_and_update().clone()),
)
}
}

impl ActionListener for DefaultActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
&self.client_operation_id
}

fn action_state(&self) -> Arc<ActionState> {
self.action_state.borrow().clone()
}

fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + Sync + '_>> {
Box::pin(async move {
self.action_state.changed().await.map_or_else(
|e| {
Err(make_err!(
Code::Internal,
"Sender of ActionState went away unexpectedly - {e:?}"
))
},
|()| Ok(self.action_state.borrow_and_update().clone()),
)
})
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>> {
Box::pin(self.changed())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ impl ActionStateResult for ClientActionStateResult {
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
unimplemented!()
todo!()
}
}
85 changes: 65 additions & 20 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{Future, Stream};
use nativelink_error::{Error, ResultExt};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ClientOperationId, OperationId, WorkerId,
ActionInfo, ActionStage, ActionState, ClientOperationId, OperationId, WorkerId,
};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
use tokio::sync::Notify;
use tokio::sync::{watch, Notify};
use tokio::time::Duration;
use tokio_stream::StreamExt;
use tracing::{event, Level};

use crate::action_scheduler::{ActionListener, ActionScheduler};
use crate::default_action_listener::DefaultActionListener;
use crate::operation_state_manager::{
ActionStateResult, ClientStateManager, MatchingEngineStateManager, OperationFilter,
OperationStageFlags,
Expand All @@ -53,6 +52,60 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

struct SimpleSchedulerActionListener {
client_operation_id: ClientOperationId,
action_state_result: Arc<dyn ActionStateResult>,
maybe_receiver: Option<watch::Receiver<Arc<ActionState>>>,
}

impl SimpleSchedulerActionListener {
fn new(
client_operation_id: ClientOperationId,
action_state_result: Arc<dyn ActionStateResult>,
) -> Self {
Self {
client_operation_id,
action_state_result,
maybe_receiver: None,
}
}
}

impl ActionListener for SimpleSchedulerActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
&self.client_operation_id
}

fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + '_>> {
let action_state_result = self.action_state_result.clone();
Box::pin(async move {
let receiver = match &mut self.maybe_receiver {
Some(receiver) => receiver,
None => {
let mut receiver = action_state_result
.as_receiver()
.await
.err_tip(|| "In SimpleSchedulerActionListener::changed getting receiver")?
.into_owned();
receiver.mark_changed();
self.maybe_receiver = Some(receiver.clone());
self.maybe_receiver.as_mut().unwrap()
}
};
receiver.changed().await.map_err(|_| {
make_err!(
Code::Internal,
"Sender hungup in SimpleSchedulerActionListener::changed()"
)
})?;
let result = receiver.borrow().clone();
Ok(result)
})
}
}

/// Engine used to manage the queued/running tasks and relationship with
/// the worker nodes. All state on how the workers and actions are interacting
/// should be held in this struct.
Expand Down Expand Up @@ -89,15 +142,11 @@ impl SimpleScheduler {
.client_state_manager
.add_action(client_operation_id.clone(), action_info)
.await?;
add_action_result
.as_receiver()
.await
.map(move |receiver| -> Pin<Box<dyn ActionListener>> {
Box::pin(DefaultActionListener::new(
client_operation_id,
receiver.into_owned(),
))
})

Ok(Box::pin(SimpleSchedulerActionListener::new(
client_operation_id,
add_action_result,
)))
}

async fn clean_recently_completed_actions(&self) {
Expand Down Expand Up @@ -127,16 +176,12 @@ impl SimpleScheduler {

let mut stream = filter_result
.err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting filter result")?;
let Some(result) = stream.next().await else {
let Some(action_state_result) = stream.next().await else {
return Ok(None);
};
Ok(Some(Box::pin(DefaultActionListener::new(
Ok(Some(Box::pin(SimpleSchedulerActionListener::new(
client_operation_id.clone(),
result
.as_receiver()
.await
.err_tip(|| "In SimpleScheduler::find_by_client_operation_id getting receiver")?
.into_owned(),
action_state_result,
))))
}

Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,14 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err
{
let expected_action_stage = ActionStage::Executing;
// Client should get notification saying it's being executed.
let action_state = client2_action_listener.action_state();
let action_state = client1_action_listener.changed().await.unwrap();
// We now know the name of the action so populate it.
assert_eq!(&action_state.stage, &expected_action_stage);
}
{
let expected_action_stage = ActionStage::Executing;
// Client should get notification saying it's being executed.
let action_state = client2_action_listener.action_state();
let action_state = client2_action_listener.changed().await.unwrap();
// We now know the name of the action so populate it.
assert_eq!(&action_state.stage, &expected_action_stage);
}
Expand Down
2 changes: 1 addition & 1 deletion nativelink-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct ExecutionServer {
instance_infos: HashMap<InstanceName, InstanceInfo>,
}

type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + Sync + 'static>>;
type ExecuteStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send + 'static>>;

impl ExecutionServer {
pub fn new(
Expand Down

0 comments on commit 0d93671

Please sign in to comment.