diff --git a/nativelink-scheduler/src/api_worker_scheduler.rs b/nativelink-scheduler/src/api_worker_scheduler.rs index b2172bad7..9eda38012 100644 --- a/nativelink-scheduler/src/api_worker_scheduler.rs +++ b/nativelink-scheduler/src/api_worker_scheduler.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use async_lock::Mutex; @@ -19,9 +20,9 @@ use lru::LruCache; use nativelink_config::schedulers::WorkerAllocationStrategy; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId, WorkerId}; -use nativelink_util::metrics_utils::Registry; +use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::operation_state_manager::WorkerStateManager; -use nativelink_util::platform_properties::PlatformProperties; +use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; use tokio::sync::Notify; use tonic::async_trait; use tracing::{event, Level}; @@ -437,8 +438,40 @@ impl WorkerScheduler for ApiWorkerScheduler { inner.set_drain_worker(worker_id, is_draining).await } - fn register_metrics(self: Arc, _registry: &mut Registry) { - // We do not register anything here because we only want to register metrics - // once and we rely on the `ActionScheduler::register_metrics()` to do that. + fn register_metrics(self: Arc, registry: &mut Registry) { + self.inner + .lock_blocking() + .worker_state_manager + .clone() + .register_metrics(registry); + registry.register_collector(Box::new(Collector::new(&self))); + } +} + +impl MetricsComponent for ApiWorkerScheduler { + fn gather_metrics(&self, c: &mut CollectorState) { + let inner = self.inner.lock_blocking(); + let mut props = HashMap::<&String, u64>::new(); + for (_worker_id, worker) in inner.workers.iter() { + c.publish_with_labels( + "workers", + worker, + "", + vec![("worker_id".into(), worker.id.to_string().into())], + ); + for (property, prop_value) in &worker.platform_properties.properties { + let current_value = props.get(&property).unwrap_or(&0); + if let PlatformPropertyValue::Minimum(worker_value) = prop_value { + props.insert(property, *current_value + *worker_value); + } + } + } + for (property, prop_value) in props { + c.publish( + &format!("{property}_available_properties"), + &prop_value, + format!("Total sum of available properties for {property}"), + ); + } } } diff --git a/nativelink-scheduler/src/awaited_action_db/mod.rs b/nativelink-scheduler/src/awaited_action_db/mod.rs index 7878e9e93..1d3cc623d 100644 --- a/nativelink-scheduler/src/awaited_action_db/mod.rs +++ b/nativelink-scheduler/src/awaited_action_db/mod.rs @@ -20,6 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey}; use futures::{Future, Stream}; use nativelink_error::Error; use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId}; +use nativelink_util::metrics_utils::MetricsComponent; mod awaited_action; @@ -71,7 +72,7 @@ pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static { } /// A trait that defines the interface for an AwaitedActionDb. -pub trait AwaitedActionDb: Send + Sync { +pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static { type Subscriber: AwaitedActionSubscriber; /// Get the AwaitedAction by the client operation id. diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index 9da1d7f4d..e5ef716f9 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -105,7 +105,6 @@ fn inner_scheduler_factory( visited_schedulers.insert(worker_scheduler_uintptr); worker_scheduler.clone().register_metrics(scheduler_metrics); } - worker_scheduler.clone().register_metrics(scheduler_metrics); } } diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index e08b66ecb..10992108c 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -28,6 +28,7 @@ use nativelink_util::action_messages::{ }; use nativelink_util::chunked_stream::ChunkedStream; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::metrics_utils::{CollectorState, MetricsComponent}; use nativelink_util::operation_state_manager::ActionStateResult; use nativelink_util::spawn; use nativelink_util::task::JoinHandleDropGuard; @@ -941,3 +942,46 @@ impl AwaitedActionDb for MemoryAwaitedActionDb { .await } } + +impl MetricsComponent for MemoryAwaitedActionDb { + fn gather_metrics(&self, c: &mut CollectorState) { + let inner = self.inner.lock_blocking(); + c.publish( + "action_state_unknown_total", + &inner.sorted_action_info_hash_keys.unknown.len(), + "Number of actions wih the current state of unknown.", + ); + c.publish( + "action_state_cache_check_total", + &inner.sorted_action_info_hash_keys.cache_check.len(), + "Number of actions wih the current state of cache_check.", + ); + c.publish( + "action_state_queued_total", + &inner.sorted_action_info_hash_keys.queued.len(), + "Number of actions wih the current state of queued.", + ); + c.publish( + "action_state_executing_total", + &inner.sorted_action_info_hash_keys.executing.len(), + "Number of actions wih the current state of executing.", + ); + c.publish( + "action_state_completed_total", + &inner.sorted_action_info_hash_keys.completed.len(), + "Number of actions wih the current state of completed.", + ); + // TODO(allada) This is legacy and should be removed in the future. + c.publish( + "active_actions_total", + &inner.sorted_action_info_hash_keys.executing.len(), + "(LEGACY) The number of running actions.", + ); + // TODO(allada) This is legacy and should be removed in the future. + c.publish( + "queued_actions_total", + &inner.sorted_action_info_hash_keys.queued.len(), + "(LEGACY) The number actions in the queue.", + ); + } +} diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 57d0dff0e..618b2077f 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -288,14 +288,14 @@ impl SimpleScheduler { } let tasks_or_worker_change_notify = Arc::new(Notify::new()); - let state_manager = Arc::new(SimpleSchedulerStateManager::new( + let state_manager = SimpleSchedulerStateManager::new( tasks_or_worker_change_notify.clone(), max_job_retries, MemoryAwaitedActionDb::new(&EvictionPolicy { max_seconds: retain_completed_for_s, ..Default::default() }), - )); + ); let worker_scheduler = ApiWorkerScheduler::new( state_manager.clone(), @@ -371,7 +371,12 @@ impl ActionScheduler for SimpleScheduler { Ok(maybe_receiver) } - fn register_metrics(self: Arc, _registry: &mut Registry) {} + fn register_metrics(self: Arc, registry: &mut Registry) { + self.client_state_manager.clone().register_metrics(registry); + self.matching_engine_state_manager + .clone() + .register_metrics(registry); + } } #[async_trait] @@ -420,8 +425,4 @@ impl WorkerScheduler for SimpleScheduler { .set_drain_worker(worker_id, is_draining) .await } - - fn register_metrics(self: Arc, registry: &mut Registry) { - self.worker_scheduler.clone().register_metrics(registry); - } } diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 5c637406e..f8bd2fc4d 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -22,6 +22,7 @@ use nativelink_util::action_messages::{ ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ClientOperationId, ExecutionMetadata, OperationId, WorkerId, }; +use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, OperationFilter, OperationStageFlags, OrderDirection, WorkerStateManager, @@ -139,12 +140,16 @@ pub struct SimpleSchedulerStateManager { } impl SimpleSchedulerStateManager { - pub fn new(tasks_change_notify: Arc, max_job_retries: usize, action_db: T) -> Self { - Self { + pub fn new( + tasks_change_notify: Arc, + max_job_retries: usize, + action_db: T, + ) -> Arc { + Arc::new(Self { action_db, tasks_change_notify, max_job_retries, - } + }) } async fn inner_update_operation( @@ -459,4 +464,17 @@ impl MatchingEngineStateManager for SimpleSchedulerStateMana self.inner_update_operation(operation_id, maybe_worker_id, stage_result) .await } + + /// Register metrics with the registry. + fn register_metrics(self: Arc, registry: &mut Registry) { + // TODO(allada) We only register the metrics in one of the components instead of + // all three because it's a bit tricky to separate the metrics for each component. + registry.register_collector(Box::new(Collector::new(&self))); + } +} + +impl MetricsComponent for SimpleSchedulerStateManager { + fn gather_metrics(&self, c: &mut CollectorState) { + c.publish("", &self.action_db, ""); + } } diff --git a/nativelink-util/src/operation_state_manager.rs b/nativelink-util/src/operation_state_manager.rs index c2a18c041..cb1b331e3 100644 --- a/nativelink-util/src/operation_state_manager.rs +++ b/nativelink-util/src/operation_state_manager.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use bitflags::bitflags; use futures::Stream; use nativelink_error::Error; +use prometheus_client::registry::Registry; use crate::action_messages::{ ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId, @@ -109,6 +110,9 @@ pub trait ClientStateManager: Sync + Send { &'a self, filter: OperationFilter, ) -> Result, Error>; + + /// Register metrics with the registry. + fn register_metrics(self: Arc, _registry: &mut Registry) {} } #[async_trait] @@ -123,6 +127,9 @@ pub trait WorkerStateManager: Sync + Send { worker_id: &WorkerId, action_stage: Result, ) -> Result<(), Error>; + + /// Register metrics with the registry. + fn register_metrics(self: Arc, _registry: &mut Registry) {} } #[async_trait] @@ -139,4 +146,7 @@ pub trait MatchingEngineStateManager: Sync + Send { operation_id: &OperationId, worker_id_or_reason_for_unsassign: Result<&WorkerId, Error>, ) -> Result<(), Error>; + + /// Register metrics with the registry. + fn register_metrics(self: Arc, _registry: &mut Registry) {} }