Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refractor(optimizer): replace StreamIndexScan by StreamTableScan on logical index scan #8567

Merged
merged 2 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/delta_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(i_a1.a1, i_a1.a._row_id, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand All @@ -25,7 +25,7 @@
└─StreamExchange { dist: HashShard(a.a1, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
Expand All @@ -34,12 +34,12 @@
StreamExchange Hash([2, 4, 3]) from 5

Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Expand Down Expand Up @@ -68,7 +68,7 @@
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
Expand All @@ -86,7 +86,7 @@
BatchPlanNode

Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode

Expand Down Expand Up @@ -114,7 +114,7 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
Expand All @@ -128,7 +128,7 @@
StreamExchange Hash([2, 4, 3]) from 5

Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode

Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,6 @@ impl dyn PlanNode {
if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost();
}
if let Some(stream_index_scan) = self.as_stream_index_scan() {
return stream_index_scan.adhoc_to_stream_prost();
}
if let Some(stream_share) = self.as_stream_share() {
return stream_share.adhoc_to_stream_prost(state);
}
Expand Down Expand Up @@ -660,7 +657,6 @@ mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_index_scan;
mod stream_local_simple_agg;
mod stream_materialize;
mod stream_now;
Expand Down Expand Up @@ -736,7 +732,6 @@ pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_index_scan::StreamIndexScan;
pub use stream_local_simple_agg::StreamLocalSimpleAgg;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
Expand Down Expand Up @@ -835,7 +830,6 @@ macro_rules! for_all_plan_nodes {
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, IndexScan }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
Expand Down Expand Up @@ -936,7 +930,6 @@ macro_rules! for_stream_plan_nodes {
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, IndexScan }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,13 @@ impl StreamNode for StreamDeltaJoin {
let left = self.left();
let right = self.right();

let left_table = if let Some(stream_index_scan) = left.as_stream_index_scan() {
stream_index_scan.logical()
} else if let Some(stream_table_scan) = left.as_stream_table_scan() {
let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
stream_table_scan.logical()
} else {
unreachable!();
};
let left_table_desc = left_table.table_desc();
let right_table = if let Some(stream_index_scan) = right.as_stream_index_scan() {
stream_index_scan.logical()
} else if let Some(stream_table_scan) = right.as_stream_table_scan() {
let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
stream_table_scan.logical()
} else {
unreachable!();
Expand Down
233 changes: 0 additions & 233 deletions src/frontend/src/optimizer/plan_node/stream_index_scan.rs

This file was deleted.

Loading