Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler metrics back #1171

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_lock::Mutex;
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId, WorkerId};
use nativelink_util::metrics_utils::Registry;
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::operation_state_manager::WorkerStateManager;
use nativelink_util::platform_properties::PlatformProperties;
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};
use tokio::sync::Notify;
use tonic::async_trait;
use tracing::{event, Level};
Expand Down Expand Up @@ -437,8 +438,40 @@ impl WorkerScheduler for ApiWorkerScheduler {
inner.set_drain_worker(worker_id, is_draining).await
}

fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {
// We do not register anything here because we only want to register metrics
// once and we rely on the `ActionScheduler::register_metrics()` to do that.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.inner
.lock_blocking()
.worker_state_manager
.clone()
.register_metrics(registry);
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl MetricsComponent for ApiWorkerScheduler {
fn gather_metrics(&self, c: &mut CollectorState) {
let inner = self.inner.lock_blocking();
let mut props = HashMap::<&String, u64>::new();
for (_worker_id, worker) in inner.workers.iter() {
c.publish_with_labels(
"workers",
worker,
"",
vec![("worker_id".into(), worker.id.to_string().into())],
);
for (property, prop_value) in &worker.platform_properties.properties {
let current_value = props.get(&property).unwrap_or(&0);
if let PlatformPropertyValue::Minimum(worker_value) = prop_value {
props.insert(property, *current_value + *worker_value);
}
}
}
for (property, prop_value) in props {
c.publish(
&format!("{property}_available_properties"),
&prop_value,
format!("Total sum of available properties for {property}"),
);
}
}
}
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
use futures::{Future, Stream};
use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId};
use nativelink_util::metrics_utils::MetricsComponent;

mod awaited_action;

Expand Down Expand Up @@ -71,7 +72,7 @@ pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static {
}

/// A trait that defines the interface for an AwaitedActionDb.
pub trait AwaitedActionDb: Send + Sync {
pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
type Subscriber: AwaitedActionSubscriber;

/// Get the AwaitedAction by the client operation id.
Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ fn inner_scheduler_factory(
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
}

Expand Down
44 changes: 44 additions & 0 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use nativelink_util::action_messages::{
};
use nativelink_util::chunked_stream::ChunkedStream;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent};
use nativelink_util::operation_state_manager::ActionStateResult;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
Expand Down Expand Up @@ -941,3 +942,46 @@ impl AwaitedActionDb for MemoryAwaitedActionDb {
.await
}
}

impl MetricsComponent for MemoryAwaitedActionDb {
fn gather_metrics(&self, c: &mut CollectorState) {
let inner = self.inner.lock_blocking();
c.publish(
"action_state_unknown_total",
&inner.sorted_action_info_hash_keys.unknown.len(),
"Number of actions wih the current state of unknown.",
);
c.publish(
"action_state_cache_check_total",
&inner.sorted_action_info_hash_keys.cache_check.len(),
"Number of actions wih the current state of cache_check.",
);
c.publish(
"action_state_queued_total",
&inner.sorted_action_info_hash_keys.queued.len(),
"Number of actions wih the current state of queued.",
);
c.publish(
"action_state_executing_total",
&inner.sorted_action_info_hash_keys.executing.len(),
"Number of actions wih the current state of executing.",
);
c.publish(
"action_state_completed_total",
&inner.sorted_action_info_hash_keys.completed.len(),
"Number of actions wih the current state of completed.",
);
// TODO(allada) This is legacy and should be removed in the future.
c.publish(
"active_actions_total",
&inner.sorted_action_info_hash_keys.executing.len(),
"(LEGACY) The number of running actions.",
);
// TODO(allada) This is legacy and should be removed in the future.
c.publish(
"queued_actions_total",
&inner.sorted_action_info_hash_keys.queued.len(),
"(LEGACY) The number actions in the queue.",
);
}
}
15 changes: 8 additions & 7 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ impl SimpleScheduler {
}

let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = Arc::new(SimpleSchedulerStateManager::new(
let state_manager = SimpleSchedulerStateManager::new(
tasks_or_worker_change_notify.clone(),
max_job_retries,
MemoryAwaitedActionDb::new(&EvictionPolicy {
max_seconds: retain_completed_for_s,
..Default::default()
}),
));
);

let worker_scheduler = ApiWorkerScheduler::new(
state_manager.clone(),
Expand Down Expand Up @@ -371,7 +371,12 @@ impl ActionScheduler for SimpleScheduler {
Ok(maybe_receiver)
}

fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.client_state_manager.clone().register_metrics(registry);
self.matching_engine_state_manager
.clone()
.register_metrics(registry);
}
}

#[async_trait]
Expand Down Expand Up @@ -420,8 +425,4 @@ impl WorkerScheduler for SimpleScheduler {
.set_drain_worker(worker_id, is_draining)
.await
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
self.worker_scheduler.clone().register_metrics(registry);
}
}
24 changes: 21 additions & 3 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use nativelink_util::action_messages::{
ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ClientOperationId,
ExecutionMetadata, OperationId, WorkerId,
};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
OperationFilter, OperationStageFlags, OrderDirection, WorkerStateManager,
Expand Down Expand Up @@ -139,12 +140,16 @@ pub struct SimpleSchedulerStateManager<T: AwaitedActionDb> {
}

impl<T: AwaitedActionDb> SimpleSchedulerStateManager<T> {
pub fn new(tasks_change_notify: Arc<Notify>, max_job_retries: usize, action_db: T) -> Self {
Self {
pub fn new(
tasks_change_notify: Arc<Notify>,
max_job_retries: usize,
action_db: T,
) -> Arc<Self> {
Arc::new(Self {
action_db,
tasks_change_notify,
max_job_retries,
}
})
}

async fn inner_update_operation(
Expand Down Expand Up @@ -459,4 +464,17 @@ impl<T: AwaitedActionDb> MatchingEngineStateManager for SimpleSchedulerStateMana
self.inner_update_operation(operation_id, maybe_worker_id, stage_result)
.await
}

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
// TODO(allada) We only register the metrics in one of the components instead of
// all three because it's a bit tricky to separate the metrics for each component.
registry.register_collector(Box::new(Collector::new(&self)));
}
}

impl<T: AwaitedActionDb> MetricsComponent for SimpleSchedulerStateManager<T> {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish("", &self.action_db, "");
}
}
10 changes: 10 additions & 0 deletions nativelink-util/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use bitflags::bitflags;
use futures::Stream;
use nativelink_error::Error;
use prometheus_client::registry::Registry;

use crate::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId,
Expand Down Expand Up @@ -109,6 +110,9 @@ pub trait ClientStateManager: Sync + Send {
&'a self,
filter: OperationFilter,
) -> Result<ActionStateResultStream<'a>, Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

#[async_trait]
Expand All @@ -123,6 +127,9 @@ pub trait WorkerStateManager: Sync + Send {
worker_id: &WorkerId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

#[async_trait]
Expand All @@ -139,4 +146,7 @@ pub trait MatchingEngineStateManager: Sync + Send {
operation_id: &OperationId,
worker_id_or_reason_for_unsassign: Result<&WorkerId, Error>,
) -> Result<(), Error>;

/// Register metrics with the registry.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}