Skip to content

Commit

Permalink
[Refactor] Overhaul of scheduler component
Browse files Browse the repository at this point in the history
This is an extremely significant overhaul to Nativelink's scheduler
component. This new scheduler design is to enable a distributed
scheduling system.

The new components & definitions:
* AwaitedActionDb - An interface that is easier to work with when
  dealing with key-value storage systems.
* MemoryAwaitedActionDb - An in-memory set of hashmaps & btrees used
  to satisfy the requirements of AwaitedActionDb interface.
* ClientStateManager - A minimal interface required to satisfy the
  requirements of a client-facing scheduler.
* WorkerStateManager - A minimal interface required to satisfy the
  requirements of a worker-facing scheduler.
* MatchingEngineStateManager - A minimal interface required to
  satisfy a engine that matches queued jobs to workers.
* SimpleSchedulerStateManager - An implements that satisfies
  ClientStateManager, WorkerStateManager & MatchingEngineStateManager
  with all the logic of the previous "SimpleScheduler" logic moved
  behind each interface.
* ApiWorkerScheduler - A component that handles all knowledge about
  workers state and implmenets the WorkerScheduler interface and
  translates them into the WorkerStateManager interface.
* SimpleScheduler - Translation calls of the ClientScheduler
  interface into ClientStateManager & MatchingEngineStateManager.
  This component is currently always forwards calls to
  SimpleSchedulerStateManager then to MemoryAwaitedActionDb.
  Future changes will make these inner components dynamic via config.

In addition we have hardened the interactions of different kind of
IDs in NativeLink. Most relevant is the separation & introduction of:
* OperationId - Represents an individual  operation being requested
  to be executed that is unique across all of time.
* ClientOperationId - An ID issued to the client when the client
  requests to execute a job. This ID will point to an OperationId
  internally, but the client is never exposed to the OperationId.
* AwaitedActionHashKey - A key used to uniquely identify an action
  that is not unique across time. This means that this key might
  have multiple OperationId's that have executed it across different
  points in time. This key is used as a "fingerprint" of an operation
  that the client wants to execute and the scheduler may decide to
  join the stream onto an existing operation if this key has a hit.

Overall these changes pave the way for more robust scheduler
implementations, most notably, distributed scheduler implementations
will be easier to implement and will be introduced in followup PRs.

This commit was developed on a side branch and consisted of the
following commits with corresponding code reviews:
54ed73c
    Add scheduler metrics back (#1171)
50fdbd7
    fix formatting (#1170)
8926236
    Merge in main and format (#1168)
9c2c7b9
    key as u64 (#1166)
0192051
    Cleanup unused code and comments (#1165)
080df5d
    Add versioning to AwaitedAction (#1163)
73c19c4
    Fix sequence bug in new memory store manager (#1162)
6e50d2c
    New AwaitedActionDb implementation (#1157)
18db991
    Fix test on running_actions_manager_test (#1141)
e50ef3c
    Rename workers to `worker_scheduler`
1fdd505
    SimpleScheduler now uses config for action pruning (#1137)
eaaa872
    Change encoding for items that are cachable (#1136)
d647056
    Errors are now properly handles in subscription (#1135)
7c3e730
    Restructure files to be more appropriate (#1131)
5e98ec9
    ClientAwaitedAction now uses a channel to notify drops happened (#1130)
52beaf9
    Cleanup unused structs (#1128)
e86fe08
    Remove all uses of salt and put under ActionUniqueQualifier (#1126)
3b86036
    Remove all need for workers to know about ActionId (#1125)
5482d7f
    Fix bazel build and test on dev (#1123)
ba52c7f
    Implement get_action_info to all ActionStateResult impls (#1118)
2fa4fee
    Remove MatchingEngineStateManager::remove_operation (#1119)
34dea06
    Remove unused proto field (#1117)
3070a40
    Remove metrics from new scheduler (#1116)
e95adfc
    StateManager will now cleanup actions on client disconnect (#1107)
6f8c001
    Fix worker execution issues (#1114)
d353c30
    rename set_priority to upgrade_priority (#1112)
0d93671
    StateManager can now be notified of noone listeneing (#1093)
cfc0cf6
    ActionScheduler will now use ActionListener instead of tokio::watch (#1091)
d70d31d
    QA fixes for scheduler-v2 (#1092)
f2cea0c
    [Refactor] Complete rewrite of SimpleScheduler
34d93b7
    [Refactor] Move worker notification in SimpleScheduler under Workers
b9d9702
    [Refactor] Moves worker logic back to SimpleScheduler
7a16e2e
    [Refactor] Move scheduler state behind mute
  • Loading branch information
allada committed Jul 16, 2024
1 parent b7ef3b6 commit f103f63
Show file tree
Hide file tree
Showing 56 changed files with 4,535 additions and 4,164 deletions.
40 changes: 22 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct SimpleScheduler {
/// a WaitExecution is called after the action has completed.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u64,
pub retain_completed_for_s: u32,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,8 @@ message ExecuteResult {
/// that initially sent the job as part of the BRE protocol.
string instance_name = 6;

/// The original execution digest request for this response. The scheduler knows what it
/// should be, but we do safety checks to ensure it really is the request we expected.
build.bazel.remote.execution.v2.Digest action_digest = 2;

/// The salt originally sent along with the StartExecute request. This salt is used
/// as a seed for cases where the execution digest should never be cached or merged
/// with other jobs. This salt is added to the hash function used to compute jobs that
/// are running or cached.
uint64 salt = 3;

// The digest function that was used to compute the action digest
// and all related blobs.
//
// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
// that case the server SHOULD infer the digest function using the
// length of the action digest hash and the digest functions announced
// in the server's capabilities.
build.bazel.remote.execution.v2.DigestFunction.Value digest_function = 7;
/// The operation ID that was executed.
string operation_id = 8;

/// The actual response data.
oneof result {
Expand All @@ -131,7 +114,7 @@ message ExecuteResult {
google.rpc.Status internal_error = 5;
}

reserved 8; // NextId.
reserved 9; // NextId.
}

/// Result sent back from the server when a node connects.
Expand All @@ -141,10 +124,10 @@ message ConnectionResult {
reserved 2; // NextId.
}

/// Request to kill a running action sent from the scheduler to a worker.
message KillActionRequest {
/// The the hex encoded unique qualifier for the action to be killed.
string action_id = 1;
/// Request to kill a running operation sent from the scheduler to a worker.
message KillOperationRequest {
/// The the operation id for the operation to be killed.
string operation_id = 1;
reserved 2; // NextId.
}
/// Communication from the scheduler to the worker.
Expand All @@ -169,8 +152,8 @@ message UpdateForWorker {
/// The worker may discard any outstanding work that is being executed.
google.protobuf.Empty disconnect = 4;

/// Instructs the worker to kill a specific running action.
KillActionRequest kill_action_request = 5;
/// Instructs the worker to kill a specific running operation.
KillOperationRequest kill_operation_request = 5;
}
reserved 6; // NextId.
}
Expand All @@ -179,14 +162,14 @@ message StartExecute {
/// The action information used to execute job.
build.bazel.remote.execution.v2.ExecuteRequest execute_request = 1;

/// See documentation in ExecuteResult::salt.
uint64 salt = 2;
/// Id of the operation.
string operation_id = 4;

/// The time at which the command was added to the queue to allow population
/// of the ActionResult.
google.protobuf.Timestamp queued_timestamp = 3;

reserved 4; // NextId.
reserved 5; // NextId.
}

/// This is a special message used to save actions into the CAS that can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,9 @@ pub struct ExecuteResult {
/// / that initially sent the job as part of the BRE protocol.
#[prost(string, tag = "6")]
pub instance_name: ::prost::alloc::string::String,
/// / The original execution digest request for this response. The scheduler knows what it
/// / should be, but we do safety checks to ensure it really is the request we expected.
#[prost(message, optional, tag = "2")]
pub action_digest: ::core::option::Option<
super::super::super::super::super::build::bazel::remote::execution::v2::Digest,
>,
/// / The salt originally sent along with the StartExecute request. This salt is used
/// / as a seed for cases where the execution digest should never be cached or merged
/// / with other jobs. This salt is added to the hash function used to compute jobs that
/// / are running or cached.
#[prost(uint64, tag = "3")]
pub salt: u64,
/// The digest function that was used to compute the action digest
/// and all related blobs.
///
/// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
/// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
/// that case the server SHOULD infer the digest function using the
/// length of the action digest hash and the digest functions announced
/// in the server's capabilities.
#[prost(
enumeration = "super::super::super::super::super::build::bazel::remote::execution::v2::digest_function::Value",
tag = "7"
)]
pub digest_function: i32,
/// / The operation ID that was executed.
#[prost(string, tag = "8")]
pub operation_id: ::prost::alloc::string::String,
/// / The actual response data.
#[prost(oneof = "execute_result::Result", tags = "4, 5")]
pub result: ::core::option::Option<execute_result::Result>,
Expand Down Expand Up @@ -116,13 +94,13 @@ pub struct ConnectionResult {
#[prost(string, tag = "1")]
pub worker_id: ::prost::alloc::string::String,
}
/// / Request to kill a running action sent from the scheduler to a worker.
/// / Request to kill a running operation sent from the scheduler to a worker.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KillActionRequest {
/// / The the hex encoded unique qualifier for the action to be killed.
pub struct KillOperationRequest {
/// / The the operation id for the operation to be killed.
#[prost(string, tag = "1")]
pub action_id: ::prost::alloc::string::String,
pub operation_id: ::prost::alloc::string::String,
}
/// / Communication from the scheduler to the worker.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -155,9 +133,9 @@ pub mod update_for_worker {
/// / The worker may discard any outstanding work that is being executed.
#[prost(message, tag = "4")]
Disconnect(()),
/// / Instructs the worker to kill a specific running action.
/// / Instructs the worker to kill a specific running operation.
#[prost(message, tag = "5")]
KillActionRequest(super::KillActionRequest),
KillOperationRequest(super::KillOperationRequest),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -168,9 +146,9 @@ pub struct StartExecute {
pub execute_request: ::core::option::Option<
super::super::super::super::super::build::bazel::remote::execution::v2::ExecuteRequest,
>,
/// / See documentation in ExecuteResult::salt.
#[prost(uint64, tag = "2")]
pub salt: u64,
/// / Id of the operation.
#[prost(string, tag = "4")]
pub operation_id: ::prost::alloc::string::String,
/// / The time at which the command was added to the queue to allow population
/// / of the ActionResult.
#[prost(message, optional, tag = "3")]
Expand Down
19 changes: 7 additions & 12 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,19 @@ rust_library(
name = "nativelink-scheduler",
srcs = [
"src/action_scheduler.rs",
"src/api_worker_scheduler.rs",
"src/awaited_action_db/awaited_action.rs",
"src/awaited_action_db/mod.rs",
"src/cache_lookup_scheduler.rs",
"src/default_action_listener.rs",
"src/default_scheduler_factory.rs",
"src/grpc_scheduler.rs",
"src/lib.rs",
"src/operation_state_manager.rs",
"src/memory_awaited_action_db.rs",
"src/platform_property_manager.rs",
"src/property_modifier_scheduler.rs",
"src/redis_action_stage.rs",
"src/redis_operation_state.rs",
"src/scheduler_state/awaited_action.rs",
"src/scheduler_state/client_action_state_result.rs",
"src/scheduler_state/completed_action.rs",
"src/scheduler_state/matching_engine_action_state_result.rs",
"src/scheduler_state/metrics.rs",
"src/scheduler_state/mod.rs",
"src/scheduler_state/state_manager.rs",
"src/scheduler_state/workers.rs",
"src/simple_scheduler.rs",
"src/simple_scheduler_state_manager.rs",
"src/worker.rs",
"src/worker_scheduler.rs",
],
Expand All @@ -42,7 +37,6 @@ rust_library(
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:bitflags",
"@crates//:blake3",
"@crates//:futures",
"@crates//:hashbrown",
Expand All @@ -55,6 +49,7 @@ rust_library(
"@crates//:scopeguard",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:static_assertions",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ tokio = { version = "1.37.0", features = ["sync", "rt", "parking_lot"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tonic = { version = "0.11.0", features = ["gzip", "tls"] }
tracing = "0.1.40"
bitflags = "2.5.0"
redis = { version = "0.25.2", features = ["aio", "tokio", "json"] }
serde = "1.0.203"
redis-macros = "0.3.0"
serde_json = "1.0.117"
static_assertions = "1.1.0"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
Loading

0 comments on commit f103f63

Please sign in to comment.