Skip to content

Commit

Permalink
feat: user configurable memory control policy (risingwavelabs#8475)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx committed Mar 13, 2023
1 parent 28c539c commit a6c8c86
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
7 changes: 7 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,13 @@ template:
# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

# The policy for compute node memory control.
memory-control-policy: streaming-only

# The proportion of streaming memory to all available memory for computing. Only works when
# `memory_control_policy` is set to "streaming-batch".
streaming-memory-proportion: 0.7

# Parallelism of tasks per compute node
parallelism: 4

Expand Down
16 changes: 16 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
pub parallelism: usize,

/// The policy for compute node memory control. Valid values:
/// - streaming-only
/// - streaming-batch
#[clap(
long,
env = "RW_MEMORY_CONTROL_POLICY",
default_value = "streaming-only"
)]
pub memory_control_policy: String,

/// The proportion of streaming memory to all available memory for computing. Only works when
/// `memory_control_policy` is set to "streaming-batch". Ignored otherwise. See
/// [`FixedProportionPolicy`] for more details.
#[clap(long, env = "RW_STREAMING_MEMORY_PROPORTION", default_value_t = 0.7)]
pub streaming_memory_proportion: f64,

#[clap(flatten)]
override_config: OverrideConfigOpts,
}
Expand Down
36 changes: 25 additions & 11 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use risingwave_common::error::Result;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
pub struct MemoryControlStats {
Expand Down Expand Up @@ -49,6 +51,27 @@ pub trait MemoryControl: Send + Sync {
fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy {}))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
}

/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and
/// streaming to a fixed proportion.
pub struct FixedProportionPolicy {
Expand All @@ -60,6 +83,7 @@ pub struct FixedProportionPolicy {

impl FixedProportionPolicy {
const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8;
const CONFIG_STR: &str = "streaming-batch";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;

Expand All @@ -73,16 +97,6 @@ impl FixedProportionPolicy {
}
}

impl Default for FixedProportionPolicy {
fn default() -> Self {
Self {
// The default streaming memory proportion is 70%. That for batch is correspondingly
// 30%.
streaming_memory_proportion: 0.7,
}
}
}

impl MemoryControl for FixedProportionPolicy {
fn apply(
&self,
Expand Down Expand Up @@ -158,10 +172,10 @@ impl MemoryControl for FixedProportionPolicy {
/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics,
/// which actually contains system usage other than computing tasks. This is the default memory
/// control policy.
#[derive(Default)]
pub struct StreamingOnlyPolicy {}

impl StreamingOnlyPolicy {
const CONFIG_STR: &str = "streaming-only";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
}
Expand Down
12 changes: 8 additions & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio::task::JoinHandle;
use crate::memory_management::memory_manager::{
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
};
use crate::memory_management::policy::StreamingOnlyPolicy;
use crate::memory_management::policy::memory_control_policy_from_config;
use crate::observer::observer_manager::ComputeObserverNode;
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;
Expand All @@ -79,7 +79,7 @@ pub async fn compute_node_serve(
opts: ComputeNodeOpts,
) -> (Vec<JoinHandle<()>>, Sender<()>) {
// Load the configuration.
let config = load_config(&opts.config_path, Some(opts.override_config));
let config = load_config(&opts.config_path, Some(opts.override_config.clone()));

info!("Starting compute node",);
info!("> config: {:?}", config);
Expand All @@ -105,7 +105,10 @@ pub async fn compute_node_serve(
let storage_opts = Arc::new(StorageOpts::from((&config, &system_params)));

let state_store_url = {
let from_local = opts.state_store.unwrap_or("hummock+memory".to_string());
let from_local = opts
.state_store
.clone()
.unwrap_or_else(|| "hummock+memory".to_string());
system_params.state_store(from_local)
};

Expand All @@ -115,6 +118,7 @@ pub async fn compute_node_serve(
total_storage_memory_limit_bytes(&config.storage, embedded_compactor_enabled);
let compute_memory_bytes =
validate_compute_node_memory_config(opts.total_memory_bytes, storage_memory_bytes);
let memory_control_policy = memory_control_policy_from_config(&opts).unwrap();

let worker_id = meta_client.worker_id();
info!("Assigned worker node id {}", worker_id);
Expand Down Expand Up @@ -239,7 +243,7 @@ pub async fn compute_node_serve(
compute_memory_bytes,
system_params.barrier_interval_ms(),
streaming_metrics.clone(),
Box::new(StreamingOnlyPolicy {}),
memory_control_policy,
);
// Run a background memory monitor
tokio::spawn(memory_mgr.clone().run(batch_mgr_clone, stream_mgr_clone));
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct ComputeNodeConfig {
pub connector_rpc_endpoint: String,

pub total_memory_bytes: usize,
pub memory_control_policy: String,
pub streaming_memory_proportion: f64,
pub parallelism: usize,
}

Expand Down
6 changes: 5 additions & 1 deletion src/risedevtool/src/task/compute_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl ComputeNodeService {
.arg("--parallelism")
.arg(&config.parallelism.to_string())
.arg("--total-memory-bytes")
.arg(&config.total_memory_bytes.to_string());
.arg(&config.total_memory_bytes.to_string())
.arg("--memory-control-policy")
.arg(&config.memory_control_policy)
.arg("--streaming-memory-proportion")
.arg(&config.streaming_memory_proportion.to_string());

let provide_jaeger = config.provide_jaeger.as_ref().unwrap();
match provide_jaeger.len() {
Expand Down

0 comments on commit a6c8c86

Please sign in to comment.