From a6c8c86ba6eab237ace63f3cdacf8b422e3ec63d Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Mon, 13 Mar 2023 11:59:26 +0800 Subject: [PATCH] feat: user configurable memory control policy (#8475) --- risedev.yml | 7 ++++ src/compute/src/lib.rs | 16 +++++++++ src/compute/src/memory_management/policy.rs | 36 +++++++++++++------ src/compute/src/server.rs | 12 ++++--- src/risedevtool/src/service_config.rs | 2 ++ .../src/task/compute_node_service.rs | 6 +++- 6 files changed, 63 insertions(+), 16 deletions(-) diff --git a/risedev.yml b/risedev.yml index 981525f12f99..c3bcb8b3b972 100644 --- a/risedev.yml +++ b/risedev.yml @@ -737,6 +737,13 @@ template: # Total available memory for the compute node in bytes total-memory-bytes: 8589934592 + # The policy for compute node memory control. + memory-control-policy: streaming-only + + # The proportion of streaming memory to all available memory for computing. Only works when + # `memory_control_policy` is set to "streaming-batch". + streaming-memory-proportion: 0.7 + # Parallelism of tasks per compute node parallelism: 4 diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 0a863f9ab546..5a1b95065ad1 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -93,6 +93,22 @@ pub struct ComputeNodeOpts { #[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())] pub parallelism: usize, + /// The policy for compute node memory control. Valid values: + /// - streaming-only + /// - streaming-batch + #[clap( + long, + env = "RW_MEMORY_CONTROL_POLICY", + default_value = "streaming-only" + )] + pub memory_control_policy: String, + + /// The proportion of streaming memory to all available memory for computing. Only works when + /// `memory_control_policy` is set to "streaming-batch". Ignored otherwise. See + /// [`FixedProportionPolicy`] for more details. + #[clap(long, env = "RW_STREAMING_MEMORY_PROPORTION", default_value_t = 0.7)] + pub streaming_memory_proportion: f64, + #[clap(flatten)] override_config: OverrideConfigOpts, } diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs index ec5fc1c375b9..56caa279a6f9 100644 --- a/src/compute/src/memory_management/policy.rs +++ b/src/compute/src/memory_management/policy.rs @@ -22,6 +22,8 @@ use risingwave_common::error::Result; use risingwave_common::util::epoch::Epoch; use risingwave_stream::task::LocalStreamManager; +use crate::ComputeNodeOpts; + /// `MemoryControlStats` contains the necessary information for memory control, including both batch /// and streaming. pub struct MemoryControlStats { @@ -49,6 +51,27 @@ pub trait MemoryControl: Send + Sync { fn describe(&self, total_compute_memory_bytes: usize) -> String; } +pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { + let input_policy = &opts.memory_control_policy; + if input_policy == FixedProportionPolicy::CONFIG_STR { + Ok(Box::new(FixedProportionPolicy::new( + opts.streaming_memory_proportion, + )?)) + } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { + Ok(Box::new(StreamingOnlyPolicy {})) + } else { + let valid_values = [ + FixedProportionPolicy::CONFIG_STR, + StreamingOnlyPolicy::CONFIG_STR, + ]; + Err(anyhow!(format!( + "invalid memory control policy in configuration: {}, valid values: {:?}", + input_policy, valid_values, + )) + .into()) + } +} + /// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and /// streaming to a fixed proportion. pub struct FixedProportionPolicy { @@ -60,6 +83,7 @@ pub struct FixedProportionPolicy { impl FixedProportionPolicy { const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; + const CONFIG_STR: &str = "streaming-batch"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; @@ -73,16 +97,6 @@ impl FixedProportionPolicy { } } -impl Default for FixedProportionPolicy { - fn default() -> Self { - Self { - // The default streaming memory proportion is 70%. That for batch is correspondingly - // 30%. - streaming_memory_proportion: 0.7, - } - } -} - impl MemoryControl for FixedProportionPolicy { fn apply( &self, @@ -158,10 +172,10 @@ impl MemoryControl for FixedProportionPolicy { /// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics, /// which actually contains system usage other than computing tasks. This is the default memory /// control policy. -#[derive(Default)] pub struct StreamingOnlyPolicy {} impl StreamingOnlyPolicy { + const CONFIG_STR: &str = "streaming-only"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 12896c6207f0..c972951180ca 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -60,7 +60,7 @@ use tokio::task::JoinHandle; use crate::memory_management::memory_manager::{ GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB, }; -use crate::memory_management::policy::StreamingOnlyPolicy; +use crate::memory_management::policy::memory_control_policy_from_config; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics; @@ -79,7 +79,7 @@ pub async fn compute_node_serve( opts: ComputeNodeOpts, ) -> (Vec>, Sender<()>) { // Load the configuration. - let config = load_config(&opts.config_path, Some(opts.override_config)); + let config = load_config(&opts.config_path, Some(opts.override_config.clone())); info!("Starting compute node",); info!("> config: {:?}", config); @@ -105,7 +105,10 @@ pub async fn compute_node_serve( let storage_opts = Arc::new(StorageOpts::from((&config, &system_params))); let state_store_url = { - let from_local = opts.state_store.unwrap_or("hummock+memory".to_string()); + let from_local = opts + .state_store + .clone() + .unwrap_or_else(|| "hummock+memory".to_string()); system_params.state_store(from_local) }; @@ -115,6 +118,7 @@ pub async fn compute_node_serve( total_storage_memory_limit_bytes(&config.storage, embedded_compactor_enabled); let compute_memory_bytes = validate_compute_node_memory_config(opts.total_memory_bytes, storage_memory_bytes); + let memory_control_policy = memory_control_policy_from_config(&opts).unwrap(); let worker_id = meta_client.worker_id(); info!("Assigned worker node id {}", worker_id); @@ -239,7 +243,7 @@ pub async fn compute_node_serve( compute_memory_bytes, system_params.barrier_interval_ms(), streaming_metrics.clone(), - Box::new(StreamingOnlyPolicy {}), + memory_control_policy, ); // Run a background memory monitor tokio::spawn(memory_mgr.clone().run(batch_mgr_clone, stream_mgr_clone)); diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 4c3c613e487e..1095558919d8 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -42,6 +42,8 @@ pub struct ComputeNodeConfig { pub connector_rpc_endpoint: String, pub total_memory_bytes: usize, + pub memory_control_policy: String, + pub streaming_memory_proportion: f64, pub parallelism: usize, } diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index 1c6a58c1eef7..3d8e8b3dd386 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -65,7 +65,11 @@ impl ComputeNodeService { .arg("--parallelism") .arg(&config.parallelism.to_string()) .arg("--total-memory-bytes") - .arg(&config.total_memory_bytes.to_string()); + .arg(&config.total_memory_bytes.to_string()) + .arg("--memory-control-policy") + .arg(&config.memory_control_policy) + .arg("--streaming-memory-proportion") + .arg(&config.streaming_memory_proportion.to_string()); let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); match provide_jaeger.len() {