diff --git a/e2e_test/batch/basic/join.slt.part b/e2e_test/batch/basic/join.slt.part index 1f7daba0f46c5..feeb793ba9e36 100644 --- a/e2e_test/batch/basic/join.slt.part +++ b/e2e_test/batch/basic/join.slt.part @@ -32,6 +32,25 @@ select * from t1 join t2 using(v1) join t3 using(v2); ---- 2 1 3 3 +statement ok +set batch_parallelism = 1; + +query IIIIII +select * from t1 join t2 using(v1) join t3 using(v2); +---- +2 1 3 3 + +statement ok +set batch_parallelism = 1000; + +query IIIIII +select * from t1 join t2 using(v1) join t3 using(v2); +---- +2 1 3 3 + +statement ok +set batch_parallelism = 0; + statement ok create index i1 on t1(v1) include(v2); diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 2265e9317499a..d40cb6ae30627 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -17,6 +17,7 @@ mod search_path; mod transaction_isolation_level; mod visibility_mode; +use std::num::NonZeroU64; use std::ops::Deref; use chrono_tz::Tz; @@ -32,7 +33,7 @@ use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 20] = [ +const CONFIG_KEYS: [&str; 21] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -53,6 +54,7 @@ const CONFIG_KEYS: [&str; 20] = [ "RW_FORCE_TWO_PHASE_AGG", "RW_ENABLE_SHARE_PLAN", "INTERVALSTYLE", + "BATCH_PARALLELISM", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -77,6 +79,7 @@ const ENABLE_TWO_PHASE_AGG: usize = 16; const FORCE_TWO_PHASE_AGG: usize = 17; const RW_ENABLE_SHARE_PLAN: usize = 18; const INTERVAL_STYLE: usize = 19; +const BATCH_PARALLELISM: usize = 20; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -277,6 +280,7 @@ type EnableTwoPhaseAgg = ConfigBool; type ForceTwoPhaseAgg = ConfigBool; type EnableSharePlan = ConfigBool; type IntervalStyle = ConfigString; +type BatchParallelism = ConfigU64; #[derive(Derivative)] #[derivative(Default)] @@ -353,6 +357,8 @@ pub struct ConfigMap { /// see interval_style: IntervalStyle, + + batch_parallelism: BatchParallelism, } impl ConfigMap { @@ -408,6 +414,8 @@ impl ConfigMap { self.enable_share_plan = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) { self.interval_style = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) { + self.batch_parallelism = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -456,6 +464,8 @@ impl ConfigMap { Ok(self.enable_share_plan.to_string()) } else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) { Ok(self.interval_style.to_string()) + } else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) { + Ok(self.batch_parallelism.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -558,6 +568,11 @@ impl ConfigMap { setting : self.interval_style.to_string(), description : String::from("It is typically set by an application upon connection to the server.") }, + VariableInfo{ + name : BatchParallelism::entry_name().to_lowercase(), + setting : self.batch_parallelism.to_string(), + description: String::from("Sets the parallelism for batch. If 0, use default value.") + }, ] } @@ -646,4 +661,11 @@ impl ConfigMap { pub fn get_interval_style(&self) -> &str { &self.interval_style } + + pub fn get_batch_parallelism(&self) -> Option { + if self.batch_parallelism.0 != 0 { + return Some(NonZeroU64::new(self.batch_parallelism.0).unwrap()); + } + None + } } diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 5375a128b3749..c33e52c86d0e2 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -145,6 +145,7 @@ pub async fn handle_explain( plan_fragmenter = Some(BatchPlanFragmenter::new( session.env().worker_node_manager_ref(), session.env().catalog_reader().clone(), + session.config().get_batch_parallelism(), plan, )?); } diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 04bc203846667..4d437b304515b 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -162,6 +162,7 @@ pub async fn handle_query( let plan_fragmenter = BatchPlanFragmenter::new( session.env().worker_node_manager_ref(), session.env().catalog_reader().clone(), + session.config().get_batch_parallelism(), plan, )?; context.append_notice(&mut notice); diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 4689f2bdd8748..e4aa85c08a7eb 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -660,6 +660,7 @@ pub(crate) mod tests { let fragmenter = BatchPlanFragmenter::new( worker_node_manager, catalog_reader, + None, batch_exchange_node.clone(), ) .unwrap(); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 26f27d912d0ac..b9fddbd7103a4 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; +use std::num::NonZeroU64; use std::sync::Arc; use anyhow::anyhow; @@ -120,6 +122,12 @@ pub struct BatchPlanFragmenter { worker_node_manager: WorkerNodeManagerRef, catalog_reader: CatalogReader, + /// if batch_parallelism is None, it means no limit, we will use the available nodes count as + /// parallelism. + /// if batch_parallelism is Some(num), we will use the min(num, the available + /// nodes count) as parallelism. + batch_parallelism: Option, + stage_graph_builder: Option, stage_graph: Option, } @@ -136,6 +144,7 @@ impl BatchPlanFragmenter { pub fn new( worker_node_manager: WorkerNodeManagerRef, catalog_reader: CatalogReader, + batch_parallelism: Option, batch_node: PlanRef, ) -> SchedulerResult { let mut plan_fragmenter = Self { @@ -144,6 +153,7 @@ impl BatchPlanFragmenter { next_stage_id: 0, worker_node_manager, catalog_reader, + batch_parallelism, stage_graph: None, }; plan_fragmenter.split_into_stage(batch_node)?; @@ -751,6 +761,11 @@ impl BatchPlanFragmenter { lookup_join_parallelism } else if source_info.is_some() { 0 + } else if let Some(num) = self.batch_parallelism { + min( + num.get() as usize, + self.worker_node_manager.worker_node_count(), + ) } else { self.worker_node_manager.worker_node_count() }