Skip to content

Commit

Permalink
fix(optimizer): projectSet && overAgg should call input's predicate p…
Browse files Browse the repository at this point in the history
…ush down && prune col (risingwavelabs#8588)
  • Loading branch information
st1page committed Mar 16, 2023
1 parent 14bfc62 commit 9b89bb0
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
└─StreamHashAgg { group_key: [$expr1, $expr2, bid.supplier_id], aggs: [sum(bid.price), count] }
└─StreamExchange { dist: HashShard($expr1, $expr2, bid.supplier_id) }
└─StreamProject { exprs: [TumbleStart(bid.bidtime, '00:10:00':Interval) as $expr1, (TumbleStart(bid.bidtime, '00:10:00':Interval) + '00:10:00':Interval) as $expr2, bid.supplier_id, bid.price, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.bidtime, bid.price, bid.item, bid.supplier_id, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamTableScan { table: bid, columns: [bid.bidtime, bid.price, bid.supplier_id, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
- before:
- create_bid
sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
└─LogicalAgg { group_key: [t.v1, t.v2, t.v3], aggs: [count, count(1:Int32)] }
└─LogicalProject { exprs: [t.v1, t.v2, t.v3, 1:Int32] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.v2 > t.v3) }
- name: filter project set transpose
- name: filter project set transpose TODO(https://github.com/risingwavelabs/risingwave/issues/8591)
sql: |
create table t(v1 int, v2 int, v3 int, arr int[]);
with cte as (select v1, v2, v3, unnest(arr) as arr_unnested from t) select * from cte where v1=10 AND v2=20 AND v3=30 AND arr_unnested=30;
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_over_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,14 @@ impl fmt::Display for LogicalOverAgg {
}

impl ColPrunable for LogicalOverAgg {
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
LogicalProject::with_mapping(self.clone().into(), mapping).into()
let new_input = {
let input = self.input();
let required = (0..input.schema().len()).collect_vec();
input.prune_col(&required, ctx)
};
LogicalProject::with_mapping(self.clone_with_input(new_input).into(), mapping).into()
}
}

Expand Down
22 changes: 14 additions & 8 deletions src/frontend/src/optimizer/plan_node/logical_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

use std::fmt;

use itertools::Itertools;
use risingwave_common::error::Result;

use super::{
generic, BatchProjectSet, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject, PlanBase,
PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream,
gen_filter_and_pushdown, generic, BatchProjectSet, ColPrunable, ExprRewritable, LogicalProject,
PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream,
};
use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall, InputRef, TableFunction};
use crate::optimizer::plan_node::{
Expand Down Expand Up @@ -237,10 +238,15 @@ impl fmt::Display for LogicalProjectSet {
}

impl ColPrunable for LogicalProjectSet {
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
// TODO: column pruning for ProjectSet
fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
// TODO: column pruning for ProjectSet https://github.com/risingwavelabs/risingwave/issues/8593
let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
LogicalProject::with_mapping(self.clone().into(), mapping).into()
let new_input = {
let input = self.input();
let required = (0..input.schema().len()).collect_vec();
input.prune_col(&required, ctx)
};
LogicalProject::with_mapping(self.clone_with_input(new_input).into(), mapping).into()
}
}

Expand All @@ -264,10 +270,10 @@ impl PredicatePushdown for LogicalProjectSet {
fn predicate_pushdown(
&self,
predicate: Condition,
_ctx: &mut PredicatePushdownContext,
ctx: &mut PredicatePushdownContext,
) -> PlanRef {
// TODO: predicate pushdown for ProjectSet
LogicalFilter::create(self.clone().into(), predicate)
// TODO: predicate pushdown https://github.com/risingwavelabs/risingwave/issues/8591
gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
}
}

Expand Down

0 comments on commit 9b89bb0

Please sign in to comment.