From b2296056bb996d9bfdbecb4e8f41655ef8d31e76 Mon Sep 17 00:00:00 2001 From: Clearlove Date: Wed, 15 Mar 2023 08:35:26 -0400 Subject: [PATCH] replace by Signed-off-by: Clearlove --- .../tests/testdata/delta_join.yaml | 6 +- .../tests/testdata/distribution_derive.yaml | 16 +- src/frontend/src/optimizer/plan_node/mod.rs | 7 - .../optimizer/plan_node/stream_delta_join.rs | 8 +- .../optimizer/plan_node/stream_index_scan.rs | 225 ------------------ .../optimizer/plan_node/stream_table_scan.rs | 16 +- 6 files changed, 21 insertions(+), 257 deletions(-) delete mode 100644 src/frontend/src/optimizer/plan_node/stream_index_scan.rs diff --git a/src/frontend/planner_test/tests/testdata/delta_join.yaml b/src/frontend/planner_test/tests/testdata/delta_join.yaml index 3ec13cd49063..8e13a1359fc0 100644 --- a/src/frontend/planner_test/tests/testdata/delta_join.yaml +++ b/src/frontend/planner_test/tests/testdata/delta_join.yaml @@ -11,8 +11,8 @@ StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_conflict: "no check" } └─StreamExchange { dist: HashShard(i_a1.a1, i_a1.a._row_id, i_b1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] } - ├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) } - └─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } + ├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) } + └─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } - sql: | set rw_streaming_enable_delta_join = true; create table a (a1 int primary key, a2 int); @@ -25,7 +25,7 @@ └─StreamExchange { dist: HashShard(a.a1, i_b1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all } ├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) } - └─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } + └─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) } - sql: | set rw_streaming_enable_delta_join = true; create table a (a1 int primary key, a2 int); diff --git a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml index ed77a351ab24..45da95af2c17 100644 --- a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml +++ b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml @@ -20,8 +20,8 @@ StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" } └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] } - ├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } - └─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + ├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + └─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" } @@ -34,12 +34,12 @@ StreamExchange Hash([2, 4, 3]) from 5 Fragment 2 - StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } Upstream BatchPlanNode Fragment 3 - StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } Upstream BatchPlanNode @@ -68,7 +68,7 @@ └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] } ├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } - └─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + └─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" } @@ -86,7 +86,7 @@ BatchPlanNode Fragment 3 - StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } + Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } Upstream BatchPlanNode @@ -114,7 +114,7 @@ StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" } └─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) } └─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] } - ├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + ├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } └─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) } stream_dist_plan: | Fragment 0 @@ -128,7 +128,7 @@ StreamExchange Hash([2, 4, 3]) from 5 Fragment 2 - StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } + Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } Upstream BatchPlanNode diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 4e391e7c26ff..b363ba29919f 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -520,9 +520,6 @@ impl dyn PlanNode { if let Some(stream_table_scan) = self.as_stream_table_scan() { return stream_table_scan.adhoc_to_stream_prost(); } - if let Some(stream_index_scan) = self.as_stream_index_scan() { - return stream_index_scan.adhoc_to_stream_prost(); - } if let Some(stream_share) = self.as_stream_share() { return stream_share.adhoc_to_stream_prost(state); } @@ -656,7 +653,6 @@ mod stream_group_topn; mod stream_hash_agg; mod stream_hash_join; mod stream_hop_window; -mod stream_index_scan; mod stream_local_simple_agg; mod stream_materialize; mod stream_now; @@ -731,7 +727,6 @@ pub use stream_group_topn::StreamGroupTopN; pub use stream_hash_agg::StreamHashAgg; pub use stream_hash_join::StreamHashJoin; pub use stream_hop_window::StreamHopWindow; -pub use stream_index_scan::StreamIndexScan; pub use stream_local_simple_agg::StreamLocalSimpleAgg; pub use stream_materialize::StreamMaterialize; pub use stream_now::StreamNow; @@ -829,7 +824,6 @@ macro_rules! for_all_plan_nodes { , { Stream, TopN } , { Stream, HopWindow } , { Stream, DeltaJoin } - , { Stream, IndexScan } , { Stream, Expand } , { Stream, DynamicFilter } , { Stream, ProjectSet } @@ -929,7 +923,6 @@ macro_rules! for_stream_plan_nodes { , { Stream, TopN } , { Stream, HopWindow } , { Stream, DeltaJoin } - , { Stream, IndexScan } , { Stream, Expand } , { Stream, DynamicFilter } , { Stream, ProjectSet } diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 698ae0c3346d..667f0f1428b0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -156,17 +156,13 @@ impl StreamNode for StreamDeltaJoin { let left = self.left(); let right = self.right(); - let left_table = if let Some(stream_index_scan) = left.as_stream_index_scan() { - stream_index_scan.logical() - } else if let Some(stream_table_scan) = left.as_stream_table_scan() { + let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() { stream_table_scan.logical() } else { unreachable!(); }; let left_table_desc = left_table.table_desc(); - let right_table = if let Some(stream_index_scan) = right.as_stream_index_scan() { - stream_index_scan.logical() - } else if let Some(stream_table_scan) = right.as_stream_table_scan() { + let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() { stream_table_scan.logical() } else { unreachable!(); diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs deleted file mode 100644 index a16551e66d1e..000000000000 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ /dev/null @@ -1,225 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt; - -use itertools::Itertools; -use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; -use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan}; - -use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode}; -use crate::catalog::ColumnId; -use crate::expr::ExprRewriter; -use crate::optimizer::plan_node::utils::IndicesDisplay; -use crate::optimizer::property::{Distribution, DistributionDisplay}; -use crate::stream_fragmenter::BuildFragmentGraphState; - -/// `StreamIndexScan` is a virtual plan node to represent a stream table scan. It will be converted -/// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` -/// creation request. Compared with `StreamTableScan`, it will reorder columns, and the chain node -/// doesn't allow rearrange. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamIndexScan { - pub base: PlanBase, - logical: LogicalScan, - batch_plan_id: PlanNodeId, - chain_type: ChainType, -} - -impl StreamIndexScan { - pub fn new(logical: LogicalScan, chain_type: ChainType) -> Self { - let ctx = logical.base.ctx.clone(); - - let distribution = { - let distribution_key = logical - .distribution_key() - .expect("distribution key of stream chain must exist in output columns"); - if distribution_key.is_empty() { - Distribution::Single - } else { - // See also `BatchSeqScan::clone_with_dist`. - Distribution::UpstreamHashShard(distribution_key, logical.table_desc().table_id) - } - }; - - let batch_plan_id = ctx.next_plan_node_id(); - // TODO: derive from input - let base = PlanBase::new_stream( - ctx, - logical.schema().clone(), - logical.base.logical_pk.clone(), - logical.functional_dependency().clone(), - distribution, - false, // TODO: determine the `append-only` field of table scan - logical.watermark_columns(), - ); - Self { - base, - logical, - batch_plan_id, - chain_type, - } - } - - pub fn table_name(&self) -> &str { - self.logical.table_name() - } - - pub fn logical(&self) -> &LogicalScan { - &self.logical - } -} - -impl_plan_tree_node_for_leaf! { StreamIndexScan } - -impl fmt::Display for StreamIndexScan { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let verbose = self.base.ctx.is_explain_verbose(); - let mut builder = f.debug_struct("StreamIndexScan"); - - let v = match verbose { - false => self.logical.column_names(), - true => self.logical.column_names_with_table_prefix(), - } - .join(", "); - builder - .field("index", &format_args!("{}", self.logical.table_name())) - .field("columns", &format_args!("[{}]", v)); - - if verbose { - builder.field( - "pk", - &IndicesDisplay { - indices: self.logical_pk(), - input_schema: &self.base.schema, - }, - ); - builder.field( - "dist", - &DistributionDisplay { - distribution: self.distribution(), - input_schema: &self.base.schema, - }, - ); - } - - builder.finish() - } -} - -impl StreamNode for StreamIndexScan { - fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { - unreachable!("stream index scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.") - } -} - -impl StreamIndexScan { - pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan { - use risingwave_pb::plan_common::*; - use risingwave_pb::stream_plan::*; - - let batch_plan_node = BatchPlanNode { - table_desc: Some(self.logical.table_desc().to_protobuf()), - column_ids: self - .logical - .output_column_ids() - .iter() - .map(ColumnId::get_id) - .collect(), - }; - - let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec(); - - ProstStreamPlan { - fields: self.schema().to_prost(), - input: vec![ - // The merge node should be empty - ProstStreamPlan { - node_body: Some(ProstStreamNode::Merge(Default::default())), - identity: "Upstream".into(), - fields: self - .logical - .table_desc() - .columns - .iter() - .map(|c| risingwave_common::catalog::Field::from(c).to_prost()) - .collect(), - stream_key: self - .logical - .table_desc() - .stream_key - .iter() - .map(|i| *i as _) - .collect(), - ..Default::default() - }, - ProstStreamPlan { - node_body: Some(ProstStreamNode::BatchPlan(batch_plan_node)), - operator_id: self.batch_plan_id.0 as u64, - identity: "BatchPlanNode".into(), - stream_key: stream_key.clone(), - input: vec![], - fields: vec![], // TODO: fill this later - append_only: true, - }, - ], - node_body: Some(ProstStreamNode::Chain(ChainNode { - table_id: self.logical.table_desc().table_id.table_id, - chain_type: self.chain_type as i32, - // The fields from upstream - upstream_fields: self - .logical - .table_desc() - .columns - .iter() - .map(|x| Field { - data_type: Some(x.data_type.to_protobuf()), - name: x.name.clone(), - }) - .collect(), - // The column idxs need to be forwarded to the downstream - upstream_column_indices: self - .logical - .output_column_indices() - .iter() - .map(|&i| i as _) - .collect(), - is_singleton: false, - table_desc: Some(self.logical.table_desc().to_protobuf()), - })), - stream_key, - operator_id: self.base.id.0 as u64, - identity: format!("{}", self), - append_only: self.append_only(), - } - } -} - -impl ExprRewritable for StreamIndexScan { - fn has_rewritable_expr(&self) -> bool { - true - } - - fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - Self::new( - self.logical - .rewrite_exprs(r) - .as_logical_scan() - .unwrap() - .clone(), - self.chain_type, - ) - .into() - } -} diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 0b2d2bc93ca6..bf10ccdcc395 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -21,9 +21,7 @@ use risingwave_common::catalog::{Field, TableDesc}; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan}; -use super::{ - ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamIndexScan, StreamNode, -}; +use super::{ExprRewritable, LogicalScan, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::ExprRewriter; use crate::optimizer::plan_node::utils::IndicesDisplay; @@ -98,12 +96,14 @@ impl StreamTableScan { index_table_desc: Rc, primary_to_secondary_mapping: &BTreeMap, chain_type: ChainType, - ) -> StreamIndexScan { - StreamIndexScan::new( + ) -> StreamTableScan { + let logical_index_scan = self.logical - .to_index_scan(index_name, index_table_desc, primary_to_secondary_mapping), - chain_type, - ) + .to_index_scan(index_name, index_table_desc, primary_to_secondary_mapping); + logical_index_scan + .distribution_key() + .expect("distribution key of stream chain must exist in output columns"); + StreamTableScan::new_with_chain_type(logical_index_scan, chain_type) } pub fn chain_type(&self) -> ChainType {