Skip to content

Commit

Permalink
feat(batch): Support index selection for sort aggregation with a desc…
Browse files Browse the repository at this point in the history
…ending ordering (risingwavelabs#8515)
  • Loading branch information
chenzl25 committed Mar 14, 2023
1 parent 97b021d commit 9564db0
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 176 deletions.
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1230,3 +1230,23 @@
└─StreamHashAgg { group_key: [idx.col1, $expr1], aggs: [count] }
└─StreamProject { exprs: [idx.col1, idx.id, Vnode(idx.id) as $expr1] }
└─StreamTableScan { table: idx, columns: [idx.col1, idx.id], pk: [idx.id], dist: UpstreamHashShard(idx.id) }
- name: sort agg on an ascending index
sql: |
create table t (a int, b int);
create index idx_asc on t(a asc);
create index idx_desc on t(a desc);
select a, count(*) cnt from t group by a order by a asc;
batch_plan: |
BatchExchange { order: [idx_asc.a ASC], dist: Single }
└─BatchSortAgg { group_key: [idx_asc.a], aggs: [count] }
└─BatchScan { table: idx_asc, columns: [idx_asc.a], distribution: UpstreamHashShard(idx_asc.a) }
- name: sort agg on a descending index
sql: |
create table t (a int, b int);
create index idx_asc on t(a asc);
create index idx_desc on t(a desc);
select a, count(*) cnt from t group by a order by a desc;
batch_plan: |
BatchExchange { order: [idx_desc.a DESC], dist: Single }
└─BatchSortAgg { group_key: [idx_desc.a], aggs: [count] }
└─BatchScan { table: idx_desc, columns: [idx_desc.a], distribution: UpstreamHashShard(idx_desc.a) }
6 changes: 4 additions & 2 deletions src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ impl IndexCatalog {
self.index_table.columns.len() == self.primary_table.columns.len()
}

/// a mapping maps column index of secondary index to column index of primary table
/// A mapping maps the column index of the secondary index to the column index of the primary
/// table.
pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
&self.secondary_to_primary_mapping
}

/// a mapping maps column index of primary table to column index of secondary index
/// A mapping maps the column index of the primary table to the column index of the secondary
/// index.
pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
&self.primary_to_secondary_mapping
}
Expand Down
91 changes: 66 additions & 25 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::sort_util::ColumnOrder;

use super::generic::{GenericPlanNode, GenericPlanRef};
use super::{
Expand Down Expand Up @@ -267,6 +267,65 @@ impl LogicalScan {
self.i2o_col_mapping().rewrite_bitset(watermark_columns)
}

/// Return indexes can satisfy the required order.
pub fn indexes_satisfy_order(&self, required_order: &Order) -> Vec<&Rc<IndexCatalog>> {
let output_col_map = self
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let unmatched_idx = output_col_map.len();
self.indexes()
.iter()
.filter(|idx| {
let s2p_mapping = idx.secondary_to_primary_mapping();
Order {
column_orders: idx
.index_table
.pk()
.iter()
.map(|idx_item| {
ColumnOrder::new(
*output_col_map
.get(
s2p_mapping
.get(&idx_item.column_index)
.expect("should be in s2p mapping"),
)
.unwrap_or(&unmatched_idx),
idx_item.order_type,
)
})
.collect(),
}
.satisfies(required_order)
})
.collect()
}

/// If the index can cover the scan, transform it to the index scan.
pub fn to_index_scan_if_index_covered(&self, index: &Rc<IndexCatalog>) -> Option<LogicalScan> {
let p2s_mapping = index.primary_to_secondary_mapping();
if self
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
let index_scan = self.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
);
Some(index_scan)
} else {
None
}
}

/// Prerequisite: the caller should guarantee that `primary_to_secondary_mapping` must cover the
/// scan.
pub fn to_index_scan(
&self,
index_name: &str,
Expand Down Expand Up @@ -581,32 +640,14 @@ impl LogicalScan {
return None;
}

let index = self.indexes().iter().find(|idx| {
Order {
column_orders: idx
.index_item
.iter()
.map(|idx_item| ColumnOrder::new(idx_item.index, OrderType::ascending()))
.collect(),
let order_satisfied_index = self.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(index_scan) = self.to_index_scan_if_index_covered(index) {
return Some(index_scan.to_batch());
}
.satisfies(required_order)
})?;

let p2s_mapping = index.primary_to_secondary_mapping();
if self
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
let index_scan = self.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
);
Some(index_scan.to_batch())
} else {
None
}

None
}
}

Expand Down
136 changes: 49 additions & 87 deletions src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ impl Rule for MinMaxOnIndexRule {
if !logical_scan.predicate().always_true() {
return None;
}
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let order = Order {
column_orders: vec![ColumnOrder::new(
calls.first()?.inputs.first()?.index(),
Expand All @@ -74,12 +67,10 @@ impl Rule for MinMaxOnIndexRule {
},
)],
};
if let Some(p) =
self.try_on_index(logical_agg, logical_scan.clone(), &order, &output_col_map)
{
if let Some(p) = self.try_on_index(logical_agg, logical_scan.clone(), &order) {
Some(p)
} else {
self.try_on_pk(logical_agg, logical_scan, &order, &output_col_map)
self.try_on_pk(logical_agg, logical_scan, &order)
}
} else {
None
Expand All @@ -96,93 +87,64 @@ impl MinMaxOnIndexRule {
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
required_order: &Order,
) -> Option<PlanRef> {
let unmatched_idx = output_col_map.len();
let index = logical_scan.indexes().iter().find(|idx| {
let s2p_mapping = idx.secondary_to_primary_mapping();
Order {
column_orders: idx
.index_table
.pk()
.iter()
.map(|idx_item| {
ColumnOrder::new(
*output_col_map
.get(
s2p_mapping
.get(&idx_item.column_index)
.expect("should be in s2p mapping"),
)
.unwrap_or(&unmatched_idx),
idx_item.order_type,
)
})
.collect(),
let order_satisfied_index = logical_scan.indexes_satisfy_order(required_order);
for index in order_satisfied_index {
if let Some(index_scan) = logical_scan.to_index_scan_if_index_covered(index) {
let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

return Some(formatting_agg.into());
}
.satisfies(order)
})?;

let p2s_mapping = index.primary_to_secondary_mapping();
}

let index_scan = if logical_scan
.required_col_idx()
.iter()
.all(|x| p2s_mapping.contains_key(x))
{
Some(logical_scan.to_index_scan(
&index.name,
index.index_table.table_desc().into(),
p2s_mapping,
))
} else {
None
}?;

let non_null_filter = LogicalFilter::create_with_expr(
index_scan.into(),
FunctionCall::new_unchecked(
ExprType::IsNotNull,
vec![ExprImpl::InputRef(Box::new(InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)))],
DataType::Boolean,
)
.into(),
);

let limit = LogicalLimit::create(non_null_filter, 1, 0);

let formatting_agg = LogicalAgg::new(
vec![PlanAggCall {
agg_kind: logical_agg.agg_calls().first()?.agg_kind,
return_type: logical_agg.schema().fields[0].data_type.clone(),
inputs: vec![InputRef::new(
0,
logical_agg.schema().fields[0].data_type.clone(),
)],
order_by: vec![],
distinct: false,
filter: Condition {
conjunctions: vec![],
},
}],
vec![],
limit,
);

Some(formatting_agg.into())
None
}

fn try_on_pk(
&self,
logical_agg: &LogicalAgg,
logical_scan: LogicalScan,
order: &Order,
output_col_map: &BTreeMap<usize, usize>,
) -> Option<PlanRef> {
let output_col_map = logical_scan
.output_col_idx()
.iter()
.cloned()
.enumerate()
.map(|(id, col)| (col, id))
.collect::<BTreeMap<_, _>>();
let unmatched_idx = output_col_map.len();
let primary_key = logical_scan.primary_key();
let primary_key_order = Order {
Expand Down
Loading

0 comments on commit 9564db0

Please sign in to comment.