Skip to content

Commit

Permalink
Add scheduler metrics back (TraceMachina#1171)
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 16, 2024
1 parent 50fdbd7 commit 54ed73c
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 17 deletions.
43 changes: 38 additions & 5 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_lock::Mutex;
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId, WorkerId};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::operation_state_manager::WorkerStateManager;
use nativelink_util::platform_properties::PlatformProperties;
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
use tokio::sync::Notify;
use tonic::async_trait;
use tracing::{event, Level};
Expand Down Expand Up @@ -437,8 +438,40 @@ impl WorkerScheduler for ApiWorkerScheduler {
inner.set_drain_worker(worker_id, is_draining).await
}

fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {
// We do not register anything here because we only want to register metrics
// once and we rely on the `ActionScheduler::register_metrics()` to do that.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.inner
.lock_blocking()
.worker_state_manager
.clone()
.register_metrics(registry);
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl MetricsComponent for ApiWorkerScheduler {
fn gather_metrics(&self, c: &mut CollectorState) {
let inner = self.inner.lock_blocking();
let mut props = HashMap::<&String, u64>::new();
for (_worker_id, worker) in inner.workers.iter() {
c.publish_with_labels(
"workers",
worker,
"",
vec![("worker_id".into(), worker.id.to_string().into())],
);
for (property, prop_value) in &worker.platform_properties.properties {
let current_value = props.get(&property).unwrap_or(&0);
if let PlatformPropertyValue::Minimum(worker_value) = prop_value {
props.insert(property, *current_value + *worker_value);
}
}
}
for (property, prop_value) in props {
c.publish(
&format!("{property}_available_properties"),
&prop_value,
format!("Total sum of available properties for {property}"),
);
}
}
}
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
use futures::{Future, Stream};
use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId};
use nativelink_util::metrics_utils::MetricsComponent;

mod awaited_action;

Expand Down Expand Up @@ -71,7 +72,7 @@ pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static {
}

/// A trait that defines the interface for an AwaitedActionDb.
pub trait AwaitedActionDb: Send + Sync {
pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
type Subscriber: AwaitedActionSubscriber;

/// Get the AwaitedAction by the client operation id.
Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ fn inner_scheduler_factory(
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
}

Expand Down
44 changes: 44 additions & 0 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use nativelink_util::action_messages::{
};
use nativelink_util::chunked_stream::ChunkedStream;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent};
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
Expand Down Expand Up @@ -941,3 +942,46 @@ impl AwaitedActionDb for MemoryAwaitedActionDb {
.await
}
}

impl MetricsComponent for MemoryAwaitedActionDb {
fn gather_metrics(&self, c: &mut CollectorState) {
let inner = self.inner.lock_blocking();
c.publish(
"action_state_unknown_total",
&inner.sorted_action_info_hash_keys.unknown.len(),
"Number of actions wih the current state of unknown.",
);
c.publish(
"action_state_cache_check_total",
&inner.sorted_action_info_hash_keys.cache_check.len(),
"Number of actions wih the current state of cache_check.",
);
c.publish(
"action_state_queued_total",
&inner.sorted_action_info_hash_keys.queued.len(),
"Number of actions wih the current state of queued.",
);
c.publish(
"action_state_executing_total",
&inner.sorted_action_info_hash_keys.executing.len(),
"Number of actions wih the current state of executing.",
);
c.publish(
"action_state_completed_total",
&inner.sorted_action_info_hash_keys.completed.len(),
"Number of actions wih the current state of completed.",
);
// TODO(allada) This is legacy and should be removed in the future.
c.publish(
"active_actions_total",
&inner.sorted_action_info_hash_keys.executing.len(),
"(LEGACY) The number of running actions.",
);
// TODO(allada) This is legacy and should be removed in the future.
c.publish(
"queued_actions_total",
&inner.sorted_action_info_hash_keys.queued.len(),
"(LEGACY) The number actions in the queue.",
);
}
}
15 changes: 8 additions & 7 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ impl SimpleScheduler {
}

let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = Arc::new(SimpleSchedulerStateManager::new(
let state_manager = SimpleSchedulerStateManager::new(
tasks_or_worker_change_notify.clone(),
max_job_retries,
MemoryAwaitedActionDb::new(&EvictionPolicy {
max_seconds: retain_completed_for_s,
..Default::default()
}),
));
);

let worker_scheduler = ApiWorkerScheduler::new(
state_manager.clone(),
Expand Down Expand Up @@ -371,7 +371,12 @@ impl ActionScheduler for SimpleScheduler {
Ok(maybe_receiver)
}

fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.client_state_manager.clone().register_metrics(registry);
self.matching_engine_state_manager
.clone()
.register_metrics(registry);
}
}

#[async_trait]
Expand Down Expand Up @@ -420,8 +425,4 @@ impl WorkerScheduler for SimpleScheduler {
.set_drain_worker(worker_id, is_draining)
.await
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.worker_scheduler.clone().register_metrics(registry);
}
}
24 changes: 21 additions & 3 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use nativelink_util::action_messages::{
ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ClientOperationId,
ExecutionMetadata, OperationId, WorkerId,
};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
OperationFilter, OperationStageFlags, OrderDirection, WorkerStateManager,
Expand Down Expand Up @@ -139,12 +140,16 @@ pub struct SimpleSchedulerStateManager<T: AwaitedActionDb> {
}

impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
pub fn new(tasks_change_notify: Arc<Notify>, max_job_retries: usize, action_db: T) -> Self {
Self {
pub fn new(
tasks_change_notify: Arc<Notify>,
max_job_retries: usize,
action_db: T,
) -> Arc<Self> {
Arc::new(Self {
action_db,
tasks_change_notify,
max_job_retries,
}
})
}

async fn inner_update_operation(
Expand Down Expand Up @@ -459,4 +464,17 @@ impl<T: AwaitedActionDb> MatchingEngineStateManager for SimpleSchedulerStateMana
self.inner_update_operation(operation_id, maybe_worker_id, stage_result)
.await
}

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
// TODO(allada) We only register the metrics in one of the components instead of
// all three because it's a bit tricky to separate the metrics for each component.
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl<T: AwaitedActionDb> MetricsComponent for SimpleSchedulerStateManager<T> {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish("", &self.action_db, "");
}
}
10 changes: 10 additions & 0 deletions nativelink-util/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use bitflags::bitflags;
use futures::Stream;
use nativelink_error::Error;
use prometheus_client::registry::Registry;

use crate::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId,
Expand Down Expand Up @@ -109,6 +110,9 @@ pub trait ClientStateManager: Sync + Send {
&'a self,
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

#[async_trait]
Expand All @@ -123,6 +127,9 @@ pub trait WorkerStateManager: Sync + Send {
worker_id: &WorkerId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

#[async_trait]
Expand All @@ -139,4 +146,7 @@ pub trait MatchingEngineStateManager: Sync + Send {
operation_id: &OperationId,
worker_id_or_reason_for_unsassign: Result<&WorkerId, Error>,
) -> Result<(), Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

0 comments on commit 54ed73c

Please sign in to comment.