Skip to content

Commit

Permalink
support batch_parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Mar 15, 2023
1 parent f907452 commit 0a04441
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 1 deletion.
19 changes: 19 additions & 0 deletions e2e_test/batch/basic/join.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ select * from t1 join t2 using(v1) join t3 using(v2);
----
2 1 3 3

statement ok
set batch_parallelism = 1;

query IIIIII
select * from t1 join t2 using(v1) join t3 using(v2);
----
2 1 3 3

statement ok
set batch_parallelism = 1000;

query IIIIII
select * from t1 join t2 using(v1) join t3 using(v2);
----
2 1 3 3

statement ok
set batch_parallelism = 0;

statement ok
create index i1 on t1(v1) include(v2);

Expand Down
24 changes: 23 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod search_path;
mod transaction_isolation_level;
mod visibility_mode;

use std::num::NonZeroU64;
use std::ops::Deref;

use chrono_tz::Tz;
Expand All @@ -32,7 +33,7 @@ use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 20] = [
const CONFIG_KEYS: [&str; 21] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand All @@ -53,6 +54,7 @@ const CONFIG_KEYS: [&str; 20] = [
"RW_FORCE_TWO_PHASE_AGG",
"RW_ENABLE_SHARE_PLAN",
"INTERVALSTYLE",
"BATCH_PARALLELISM",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand All @@ -77,6 +79,7 @@ const ENABLE_TWO_PHASE_AGG: usize = 16;
const FORCE_TWO_PHASE_AGG: usize = 17;
const RW_ENABLE_SHARE_PLAN: usize = 18;
const INTERVAL_STYLE: usize = 19;
const BATCH_PARALLELISM: usize = 20;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -277,6 +280,7 @@ type EnableTwoPhaseAgg = ConfigBool<ENABLE_TWO_PHASE_AGG, true>;
type ForceTwoPhaseAgg = ConfigBool<FORCE_TWO_PHASE_AGG, false>;
type EnableSharePlan = ConfigBool<RW_ENABLE_SHARE_PLAN, true>;
type IntervalStyle = ConfigString<INTERVAL_STYLE>;
type BatchParallelism = ConfigU64<BATCH_PARALLELISM, 0>;

#[derive(Derivative)]
#[derivative(Default)]
Expand Down Expand Up @@ -353,6 +357,8 @@ pub struct ConfigMap {

/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-INTERVALSTYLE>
interval_style: IntervalStyle,

batch_parallelism: BatchParallelism,
}

impl ConfigMap {
Expand Down Expand Up @@ -408,6 +414,8 @@ impl ConfigMap {
self.enable_share_plan = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
self.interval_style = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
self.batch_parallelism = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -456,6 +464,8 @@ impl ConfigMap {
Ok(self.enable_share_plan.to_string())
} else if key.eq_ignore_ascii_case(IntervalStyle::entry_name()) {
Ok(self.interval_style.to_string())
} else if key.eq_ignore_ascii_case(BatchParallelism::entry_name()) {
Ok(self.batch_parallelism.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -558,6 +568,11 @@ impl ConfigMap {
setting : self.interval_style.to_string(),
description : String::from("It is typically set by an application upon connection to the server.")
},
VariableInfo{
name : BatchParallelism::entry_name().to_lowercase(),
setting : self.batch_parallelism.to_string(),
description: String::from("Sets the parallelism for batch. If 0, use default value.")
},
]
}

Expand Down Expand Up @@ -646,4 +661,11 @@ impl ConfigMap {
pub fn get_interval_style(&self) -> &str {
&self.interval_style
}

pub fn get_batch_parallelism(&self) -> Option<NonZeroU64> {
if self.batch_parallelism.0 != 0 {
return Some(NonZeroU64::new(self.batch_parallelism.0).unwrap());
}
None
}
}
1 change: 1 addition & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pub async fn handle_explain(
plan_fragmenter = Some(BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
session.config().get_batch_parallelism(),
plan,
)?);
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub async fn handle_query(
let plan_fragmenter = BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
session.config().get_batch_parallelism(),
plan,
)?;
context.append_notice(&mut notice);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ pub(crate) mod tests {
let fragmenter = BatchPlanFragmenter::new(
worker_node_manager,
catalog_reader,
None,
batch_exchange_node.clone(),
)
.unwrap();
Expand Down
15 changes: 15 additions & 0 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use std::sync::Arc;

use anyhow::anyhow;
Expand Down Expand Up @@ -120,6 +122,12 @@ pub struct BatchPlanFragmenter {
worker_node_manager: WorkerNodeManagerRef,
catalog_reader: CatalogReader,

/// if batch_parallelism is None, it means no limit, we will use the available nodes count as
/// parallelism.
/// if batch_parallelism is Some(num), we will use the min(num, the available
/// nodes count) as parallelism.
batch_parallelism: Option<NonZeroU64>,

stage_graph_builder: Option<StageGraphBuilder>,
stage_graph: Option<StageGraph>,
}
Expand All @@ -136,6 +144,7 @@ impl BatchPlanFragmenter {
pub fn new(
worker_node_manager: WorkerNodeManagerRef,
catalog_reader: CatalogReader,
batch_parallelism: Option<NonZeroU64>,
batch_node: PlanRef,
) -> SchedulerResult<Self> {
let mut plan_fragmenter = Self {
Expand All @@ -144,6 +153,7 @@ impl BatchPlanFragmenter {
next_stage_id: 0,
worker_node_manager,
catalog_reader,
batch_parallelism,
stage_graph: None,
};
plan_fragmenter.split_into_stage(batch_node)?;
Expand Down Expand Up @@ -751,6 +761,11 @@ impl BatchPlanFragmenter {
lookup_join_parallelism
} else if source_info.is_some() {
0
} else if let Some(num) = self.batch_parallelism {
min(
num.get() as usize,
self.worker_node_manager.worker_node_count(),
)
} else {
self.worker_node_manager.worker_node_count()
}
Expand Down

0 comments on commit 0a04441

Please sign in to comment.