Skip to content

Commit

Permalink
Fix worker execution issues (TraceMachina#1114)
Browse files Browse the repository at this point in the history
Worker stream now properly terminated on action complete.
  • Loading branch information
allada committed Jul 8, 2024
1 parent d353c30 commit 6f8c001
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
19 changes: 17 additions & 2 deletions nativelink-scheduler/src/scheduler_state/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,15 @@ impl ApiWorkerSchedulerImpl {
}
}

// Clear this action from the current worker.
// We are done if the action is not finished or there was an error.
let is_finished = action_stage
.as_ref()
.map_or_else(|_| true, |action_stage| action_stage.is_finished());
if !is_finished {
return Ok(());
}

// Clear this action from the current worker if finished.
let complete_action_res = {
let was_paused = !worker.can_accept_work();

Expand All @@ -193,7 +201,6 @@ impl ApiWorkerSchedulerImpl {
complete_action_res
};

// TODO(allada) This should move to inside the Workers struct.
self.worker_change_notify.notify_one();

complete_action_res
Expand Down Expand Up @@ -228,6 +235,14 @@ impl ApiWorkerSchedulerImpl {
return Result::<(), _>::Err(err.clone())
.merge(self.immediate_evict_worker(&worker_id, err).await);
}
} else {
event!(
Level::WARN,
?worker_id,
?operation_id,
?action_info,
"Worker not found in worker map in worker_notify_run_action"
);
}
Ok(())
}
Expand Down
8 changes: 7 additions & 1 deletion nativelink-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,15 @@ impl ExecutionServer {
let client_operation_id = ClientOperationId::from_raw_string(
client_operation_id_string.clone(),
);
// If the action is finished we won't be sending any more updates.
let maybe_action_listener = if action_update.stage.is_finished() {
None
} else {
Some(action_listener)
};
Some((
Ok(action_update.as_operation(client_operation_id)),
Some(action_listener),
maybe_action_listener,
))
}
Err(err) => {
Expand Down

0 comments on commit 6f8c001

Please sign in to comment.