From 25a4809a0a6099743a4af9a418ae019ca9edb5ad Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 16 Mar 2023 16:16:25 +0800 Subject: [PATCH] feat(streaming): only output required columns for MV-on-MV (#8555) Signed-off-by: Bugen Zhao --- Cargo.lock | 1 + dashboard/proto/gen/catalog.ts | 43 ++++----- dashboard/proto/gen/expr.ts | 19 ++-- dashboard/proto/gen/stream_plan.ts | 46 ++++----- e2e_test/ddl/alter_table_column.slt | 15 ++- proto/stream_plan.proto | 16 ++-- .../optimizer/plan_node/stream_index_scan.rs | 90 +++++++++--------- .../optimizer/plan_node/stream_table_scan.rs | 94 +++++++++---------- src/meta/src/stream/stream_graph/actor.rs | 15 +-- src/meta/src/stream/stream_graph/fragment.rs | 65 +++++++++++-- src/prost/Cargo.toml | 1 + src/prost/build.rs | 1 + src/stream/src/executor/backfill.rs | 36 ++----- src/stream/src/executor/chain.rs | 48 ++-------- src/stream/src/executor/rearranged_chain.rs | 52 +--------- src/stream/src/from_proto/chain.rs | 66 ++++++------- 16 files changed, 267 insertions(+), 341 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a0176a2a87e..cfaab8a21fa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6339,6 +6339,7 @@ dependencies = [ name = "risingwave_pb" version = "0.2.0-alpha" dependencies = [ + "enum-as-inner", "madsim-tonic", "madsim-tonic-build", "pbjson", diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index 3b37fa9cc1f3..bcd7c1f140ab 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -204,6 +204,7 @@ export interface Function { name: string; owner: number; argTypes: DataType[]; + returnType: DataType | undefined; language: string; link: string; identifier: string; @@ -214,11 +215,9 @@ export interface Function { } export interface Function_ScalarFunction { - returnType: DataType | undefined; } export interface Function_TableFunction { - returnTypes: DataType[]; } export interface Function_AggregateFunction { @@ -828,6 +827,7 @@ function createBaseFunction(): Function { name: "", owner: 0, argTypes: [], + returnType: undefined, language: "", link: "", identifier: "", @@ -846,6 +846,7 @@ export const Function = { argTypes: Array.isArray(object?.argTypes) ? object.argTypes.map((e: any) => DataType.fromJSON(e)) : [], + returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined, language: isSet(object.language) ? String(object.language) : "", link: isSet(object.link) ? String(object.link) : "", identifier: isSet(object.identifier) ? String(object.identifier) : "", @@ -871,6 +872,8 @@ export const Function = { } else { obj.argTypes = []; } + message.returnType !== undefined && + (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); message.language !== undefined && (obj.language = message.language); message.link !== undefined && (obj.link = message.link); message.identifier !== undefined && (obj.identifier = message.identifier); @@ -893,6 +896,9 @@ export const Function = { message.name = object.name ?? ""; message.owner = object.owner ?? 0; message.argTypes = object.argTypes?.map((e) => DataType.fromPartial(e)) || []; + message.returnType = (object.returnType !== undefined && object.returnType !== null) + ? DataType.fromPartial(object.returnType) + : undefined; message.language = object.language ?? ""; message.link = object.link ?? ""; message.identifier = object.identifier ?? ""; @@ -910,54 +916,41 @@ export const Function = { }; function createBaseFunction_ScalarFunction(): Function_ScalarFunction { - return { returnType: undefined }; + return {}; } export const Function_ScalarFunction = { - fromJSON(object: any): Function_ScalarFunction { - return { returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined }; + fromJSON(_: any): Function_ScalarFunction { + return {}; }, - toJSON(message: Function_ScalarFunction): unknown { + toJSON(_: Function_ScalarFunction): unknown { const obj: any = {}; - message.returnType !== undefined && - (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); return obj; }, - fromPartial, I>>(object: I): Function_ScalarFunction { + fromPartial, I>>(_: I): Function_ScalarFunction { const message = createBaseFunction_ScalarFunction(); - message.returnType = (object.returnType !== undefined && object.returnType !== null) - ? DataType.fromPartial(object.returnType) - : undefined; return message; }, }; function createBaseFunction_TableFunction(): Function_TableFunction { - return { returnTypes: [] }; + return {}; } export const Function_TableFunction = { - fromJSON(object: any): Function_TableFunction { - return { - returnTypes: Array.isArray(object?.returnTypes) ? object.returnTypes.map((e: any) => DataType.fromJSON(e)) : [], - }; + fromJSON(_: any): Function_TableFunction { + return {}; }, - toJSON(message: Function_TableFunction): unknown { + toJSON(_: Function_TableFunction): unknown { const obj: any = {}; - if (message.returnTypes) { - obj.returnTypes = message.returnTypes.map((e) => e ? DataType.toJSON(e) : undefined); - } else { - obj.returnTypes = []; - } return obj; }, - fromPartial, I>>(object: I): Function_TableFunction { + fromPartial, I>>(_: I): Function_TableFunction { const message = createBaseFunction_TableFunction(); - message.returnTypes = object.returnTypes?.map((e) => DataType.fromPartial(e)) || []; return message; }, }; diff --git a/dashboard/proto/gen/expr.ts b/dashboard/proto/gen/expr.ts index 1b610347b05f..4faa1e414aa7 100644 --- a/dashboard/proto/gen/expr.ts +++ b/dashboard/proto/gen/expr.ts @@ -644,7 +644,9 @@ export function exprNode_TypeToJSON(object: ExprNode_Type): string { export interface TableFunction { functionType: TableFunction_Type; args: ExprNode[]; - returnTypes: DataType[]; + returnType: + | DataType + | undefined; /** optional. only used when the type is UDTF. */ udtf: UserDefinedTableFunction | undefined; } @@ -961,7 +963,7 @@ export const ExprNode = { }; function createBaseTableFunction(): TableFunction { - return { functionType: TableFunction_Type.UNSPECIFIED, args: [], returnTypes: [], udtf: undefined }; + return { functionType: TableFunction_Type.UNSPECIFIED, args: [], returnType: undefined, udtf: undefined }; } export const TableFunction = { @@ -973,7 +975,7 @@ export const TableFunction = { args: Array.isArray(object?.args) ? object.args.map((e: any) => ExprNode.fromJSON(e)) : [], - returnTypes: Array.isArray(object?.returnTypes) ? object.returnTypes.map((e: any) => DataType.fromJSON(e)) : [], + returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined, udtf: isSet(object.udtf) ? UserDefinedTableFunction.fromJSON(object.udtf) : undefined, }; }, @@ -986,11 +988,8 @@ export const TableFunction = { } else { obj.args = []; } - if (message.returnTypes) { - obj.returnTypes = message.returnTypes.map((e) => e ? DataType.toJSON(e) : undefined); - } else { - obj.returnTypes = []; - } + message.returnType !== undefined && + (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); message.udtf !== undefined && (obj.udtf = message.udtf ? UserDefinedTableFunction.toJSON(message.udtf) : undefined); return obj; }, @@ -999,7 +998,9 @@ export const TableFunction = { const message = createBaseTableFunction(); message.functionType = object.functionType ?? TableFunction_Type.UNSPECIFIED; message.args = object.args?.map((e) => ExprNode.fromPartial(e)) || []; - message.returnTypes = object.returnTypes?.map((e) => DataType.fromPartial(e)) || []; + message.returnType = (object.returnType !== undefined && object.returnType !== null) + ? DataType.fromPartial(object.returnType) + : undefined; message.udtf = (object.udtf !== undefined && object.udtf !== null) ? UserDefinedTableFunction.fromPartial(object.udtf) : undefined; diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index a3e887a2eeb9..e6b43c3d7d8e 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -691,18 +691,18 @@ export interface ExchangeNode { */ export interface ChainNode { tableId: number; - /** The schema of input stream, which will be used to build a MergeNode */ - upstreamFields: Field[]; - /** - * The columns from the upstream table to output. - * TODO: rename this field. - */ - upstreamColumnIndices: number[]; /** * The columns from the upstream table that'll be internally required by this chain node. - * TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529). + * - For non-backfill chain node, it's the same as the output columns. + * - For backfill chain node, there're additionally primary key columns. */ upstreamColumnIds: number[]; + /** + * The columns to be output by this chain node. The index is based on the internal required columns. + * - For non-backfill chain node, it's simply all the columns. + * - For backfill chain node, this strips the primary key columns if they're unnecessary. + */ + outputIndices: number[]; /** * Generally, the barrier needs to be rearranged during the MV creation process, so that data can * be flushed to shared buffer periodically, instead of making the first epoch from batch query extra @@ -3115,9 +3115,8 @@ export const ExchangeNode = { function createBaseChainNode(): ChainNode { return { tableId: 0, - upstreamFields: [], - upstreamColumnIndices: [], upstreamColumnIds: [], + outputIndices: [], chainType: ChainType.CHAIN_UNSPECIFIED, tableDesc: undefined, }; @@ -3127,15 +3126,12 @@ export const ChainNode = { fromJSON(object: any): ChainNode { return { tableId: isSet(object.tableId) ? Number(object.tableId) : 0, - upstreamFields: Array.isArray(object?.upstreamFields) - ? object.upstreamFields.map((e: any) => Field.fromJSON(e)) - : [], - upstreamColumnIndices: Array.isArray(object?.upstreamColumnIndices) - ? object.upstreamColumnIndices.map((e: any) => Number(e)) - : [], upstreamColumnIds: Array.isArray(object?.upstreamColumnIds) ? object.upstreamColumnIds.map((e: any) => Number(e)) : [], + outputIndices: Array.isArray(object?.outputIndices) + ? object.outputIndices.map((e: any) => Number(e)) + : [], chainType: isSet(object.chainType) ? chainTypeFromJSON(object.chainType) : ChainType.CHAIN_UNSPECIFIED, tableDesc: isSet(object.tableDesc) ? StorageTableDesc.fromJSON(object.tableDesc) : undefined, }; @@ -3144,21 +3140,16 @@ export const ChainNode = { toJSON(message: ChainNode): unknown { const obj: any = {}; message.tableId !== undefined && (obj.tableId = Math.round(message.tableId)); - if (message.upstreamFields) { - obj.upstreamFields = message.upstreamFields.map((e) => e ? Field.toJSON(e) : undefined); - } else { - obj.upstreamFields = []; - } - if (message.upstreamColumnIndices) { - obj.upstreamColumnIndices = message.upstreamColumnIndices.map((e) => Math.round(e)); - } else { - obj.upstreamColumnIndices = []; - } if (message.upstreamColumnIds) { obj.upstreamColumnIds = message.upstreamColumnIds.map((e) => Math.round(e)); } else { obj.upstreamColumnIds = []; } + if (message.outputIndices) { + obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); + } else { + obj.outputIndices = []; + } message.chainType !== undefined && (obj.chainType = chainTypeToJSON(message.chainType)); message.tableDesc !== undefined && (obj.tableDesc = message.tableDesc ? StorageTableDesc.toJSON(message.tableDesc) : undefined); @@ -3168,9 +3159,8 @@ export const ChainNode = { fromPartial, I>>(object: I): ChainNode { const message = createBaseChainNode(); message.tableId = object.tableId ?? 0; - message.upstreamFields = object.upstreamFields?.map((e) => Field.fromPartial(e)) || []; - message.upstreamColumnIndices = object.upstreamColumnIndices?.map((e) => e) || []; message.upstreamColumnIds = object.upstreamColumnIds?.map((e) => e) || []; + message.outputIndices = object.outputIndices?.map((e) => e) || []; message.chainType = object.chainType ?? ChainType.CHAIN_UNSPECIFIED; message.tableDesc = (object.tableDesc !== undefined && object.tableDesc !== null) ? StorageTableDesc.fromPartial(object.tableDesc) diff --git a/e2e_test/ddl/alter_table_column.slt b/e2e_test/ddl/alter_table_column.slt index 35ffcf205453..7cad432aa319 100644 --- a/e2e_test/ddl/alter_table_column.slt +++ b/e2e_test/ddl/alter_table_column.slt @@ -127,7 +127,6 @@ select * from mv3; 3 3.3 3-3 # Drop column -# TODO(#4529): create mview on partial columns and test whether dropping the unrefereced column works. statement error being referenced alter table t drop column s; @@ -137,6 +136,10 @@ drop materialized view mv2; statement ok drop materialized view mv3; +statement ok +create materialized view mv5 as select v, s from t; + +# This should succeed as there's no materialized view referencing the column, including `mv5`. (#4529) statement ok alter table t drop column r; @@ -152,6 +155,13 @@ select v, s from t; 2 NULL 3 3-3 +query IR rowsort +select v, s from mv5; +---- +1 1-1 +2 NULL +3 3-3 + # Add column after dropping column, to test that the column ID is not reused. statement ok alter table t add column r real; @@ -191,6 +201,9 @@ select v, s, r from t; 4 4-4 4.4 # Clean up +statement ok +drop materialized view mv5; + statement ok drop materialized view mv; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 99d53b3fc42d..f865ab992e01 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -384,14 +384,16 @@ enum ChainType { // 2. BatchPlanNode for snapshot read. message ChainNode { uint32 table_id = 1; - // The schema of input stream, which will be used to build a MergeNode - repeated plan_common.Field upstream_fields = 2; - // The columns from the upstream table to output. - // TODO: rename this field. - repeated uint32 upstream_column_indices = 3; + // The columns from the upstream table that'll be internally required by this chain node. - // TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529). - repeated int32 upstream_column_ids = 8; + // - For non-backfill chain node, it's the same as the output columns. + // - For backfill chain node, there're additionally primary key columns. + repeated int32 upstream_column_ids = 2; + // The columns to be output by this chain node. The index is based on the internal required columns. + // - For non-backfill chain node, it's simply all the columns. + // - For backfill chain node, this strips the primary key columns if they're unnecessary. + repeated uint32 output_indices = 3; + // Generally, the barrier needs to be rearranged during the MV creation process, so that data can // be flushed to shared buffer periodically, instead of making the first epoch from batch query extra // large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode. diff --git a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs index fc9380e90ae1..a495e5fb3ebf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_index_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_index_scan.rs @@ -15,6 +15,7 @@ use std::fmt; use itertools::Itertools; +use risingwave_common::catalog::Field; use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::{ChainType, StreamNode as ProstStreamPlan}; @@ -128,27 +129,53 @@ impl StreamIndexScan { // TODO: this method is almost the same as `StreamTableScan::adhoc_to_stream_prost`, we should // avoid duplication. pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan { - use risingwave_pb::plan_common::*; use risingwave_pb::stream_plan::*; - let batch_plan_node = BatchPlanNode { - table_desc: Some(self.logical.table_desc().to_protobuf()), - column_ids: self - .logical - .output_column_ids() - .iter() - .map(ColumnId::get_id) - .collect(), - }; - let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec(); + // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { + // For backfill, we additionally need the primary key columns. ChainType::Backfill => self.logical.output_and_pk_column_ids(), ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => { self.logical.output_column_ids() } ChainType::ChainUnspecified => unreachable!(), + } + .iter() + .map(ColumnId::get_id) + .collect_vec(); + + // The schema of the upstream table (both scan and upstream). + let upstream_schema = upstream_column_ids + .iter() + .map(|&id| { + let col = self + .logical + .table_desc() + .columns + .iter() + .find(|c| c.column_id.get_id() == id) + .unwrap(); + Field::from(col).to_prost() + }) + .collect_vec(); + + let output_indices = self + .logical + .output_column_ids() + .iter() + .map(|i| { + upstream_column_ids + .iter() + .position(|&x| x == i.get_id()) + .unwrap() as u32 + }) + .collect_vec(); + + let batch_plan_node = BatchPlanNode { + table_desc: Some(self.logical.table_desc().to_protobuf()), + column_ids: upstream_column_ids.clone(), }; ProstStreamPlan { @@ -158,54 +185,25 @@ impl StreamIndexScan { ProstStreamPlan { node_body: Some(ProstStreamNode::Merge(Default::default())), identity: "Upstream".into(), - fields: self - .logical - .table_desc() - .columns - .iter() - .map(|c| risingwave_common::catalog::Field::from(c).to_prost()) - .collect(), - stream_key: self - .logical - .table_desc() - .stream_key - .iter() - .map(|i| *i as _) - .collect(), + fields: upstream_schema.clone(), + stream_key: vec![], // not used ..Default::default() }, ProstStreamPlan { node_body: Some(ProstStreamNode::BatchPlan(batch_plan_node)), operator_id: self.batch_plan_id.0 as u64, identity: "BatchPlanNode".into(), - stream_key: stream_key.clone(), input: vec![], - fields: vec![], // TODO: fill this later + fields: upstream_schema, + stream_key: vec![], // not used append_only: true, }, ], node_body: Some(ProstStreamNode::Chain(ChainNode { table_id: self.logical.table_desc().table_id.table_id, chain_type: self.chain_type as i32, - // The fields from upstream - upstream_fields: self - .logical - .table_desc() - .columns - .iter() - .map(|x| Field { - data_type: Some(x.data_type.to_protobuf()), - name: x.name.clone(), - }) - .collect(), - // The column idxs need to be forwarded to the downstream - upstream_column_indices: self - .logical - .output_column_indices() - .iter() - .map(|&i| i as _) - .collect(), - upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(), + output_indices, + upstream_column_ids, table_desc: Some(self.logical.table_desc().to_protobuf()), })), stream_key, diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index ae0f39a350a4..545ba673fecb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -156,60 +156,72 @@ impl StreamNode for StreamTableScan { impl StreamTableScan { pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan { - use risingwave_pb::plan_common::Field as ProstField; use risingwave_pb::stream_plan::*; - let batch_plan_node = BatchPlanNode { - table_desc: Some(self.logical.table_desc().to_protobuf()), - column_ids: self - .logical - .output_column_ids() - .iter() - .map(ColumnId::get_id) - .collect(), - }; - - let stream_key = self.logical_pk().iter().map(|x| *x as u32).collect_vec(); + let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec(); // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { + // For backfill, we additionally need the primary key columns. + ChainType::Backfill => self.logical.output_and_pk_column_ids(), ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => { self.logical.output_column_ids() } - // For backfill, we additionally need the primary key columns. - ChainType::Backfill => self.logical.output_and_pk_column_ids(), ChainType::ChainUnspecified => unreachable!(), + } + .iter() + .map(ColumnId::get_id) + .collect_vec(); + + // The schema of the upstream table (both scan and upstream). + let upstream_schema = upstream_column_ids + .iter() + .map(|&id| { + let col = self + .logical + .table_desc() + .columns + .iter() + .find(|c| c.column_id.get_id() == id) + .unwrap(); + Field::from(col).to_prost() + }) + .collect_vec(); + + let output_indices = self + .logical + .output_column_ids() + .iter() + .map(|i| { + upstream_column_ids + .iter() + .position(|&x| x == i.get_id()) + .unwrap() as u32 + }) + .collect_vec(); + + let batch_plan_node = BatchPlanNode { + table_desc: Some(self.logical.table_desc().to_protobuf()), + column_ids: upstream_column_ids.clone(), }; ProstStreamPlan { fields: self.schema().to_prost(), input: vec![ - // The merge node should be empty + // The merge node body will be filled by the `ActorBuilder` on the meta service. ProstStreamPlan { node_body: Some(ProstStreamNode::Merge(Default::default())), identity: "Upstream".into(), - fields: self - .logical - .table_desc() - .columns - .iter() - .map(|c| Field::from(c).to_prost()) - .collect(), - stream_key: self - .logical - .table_desc() - .stream_key - .iter() - .map(|i| *i as _) - .collect(), + fields: upstream_schema.clone(), + stream_key: vec![], // not used ..Default::default() }, ProstStreamPlan { node_body: Some(ProstStreamNode::BatchPlan(batch_plan_node)), operator_id: self.batch_plan_id.0 as u64, identity: "BatchPlanNode".into(), - stream_key: stream_key.clone(), - fields: self.schema().to_prost(), + fields: upstream_schema, + stream_key: vec![], // not used input: vec![], append_only: true, }, @@ -217,25 +229,9 @@ impl StreamTableScan { node_body: Some(ProstStreamNode::Chain(ChainNode { table_id: self.logical.table_desc().table_id.table_id, chain_type: self.chain_type as i32, - // The fields from upstream - upstream_fields: self - .logical - .table_desc() - .columns - .iter() - .map(|x| ProstField { - data_type: Some(x.data_type.to_protobuf()), - name: x.name.clone(), - }) - .collect(), // The column indices need to be forwarded to the downstream - upstream_column_indices: self - .logical - .output_column_indices() - .iter() - .map(|&i| i as _) - .collect(), - upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(), + output_indices, + upstream_column_ids, // The table desc used by backfill executor table_desc: Some(self.logical.table_desc().to_protobuf()), })), diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 41315fb78060..5443c3ec0a67 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -170,7 +170,7 @@ impl ActorBuilder { // Index the upstreams by the an external edge ID. let upstreams = &self.upstreams[&EdgeId::UpstreamExternal { - upstream_table_id: chain_node.table_id, + upstream_table_id: chain_node.table_id.into(), downstream_fragment_id: self.fragment_id, }]; @@ -180,28 +180,21 @@ impl ActorBuilder { assert_eq!(upstream_actor_id.len(), 1); let chain_input = vec![ - // Fill the merge node with correct upstream info. + // Fill the merge node body with correct upstream info. StreamNode { - input: vec![], - stream_key: merge_node.stream_key.clone(), node_body: Some(NodeBody::Merge(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), upstream_dispatcher_type: DispatcherType::NoShuffle as _, - fields: chain_node.upstream_fields.clone(), + fields: merge_node.fields.clone(), })), - fields: chain_node.upstream_fields.clone(), - operator_id: merge_node.operator_id, - identity: "MergeExecutor".to_string(), - append_only: stream_node.append_only, + ..merge_node.clone() }, batch_plan_node.clone(), ]; Ok(StreamNode { input: chain_input, - identity: "ChainExecutor".to_string(), - fields: chain_node.upstream_fields.clone(), ..stream_node.clone() }) } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 331be4ca046f..6580bdf13117 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -52,6 +52,9 @@ pub(super) struct BuildingFragment { /// The ID of the job if it's materialized in this fragment. table_id: Option, + + /// The required columns of each upstream table. + upstream_table_columns: HashMap>, } impl BuildingFragment { @@ -69,11 +72,13 @@ impl BuildingFragment { }; let internal_tables = Self::fill_internal_tables(&mut fragment, job, table_id_gen); let table_id = Self::fill_job(&mut fragment, job).then(|| job.id()); + let upstream_table_columns = Self::extract_upstream_table_columns(&mut fragment); Self { inner: fragment, internal_tables, table_id, + upstream_table_columns, } } @@ -138,6 +143,28 @@ impl BuildingFragment { has_table } + + /// Extract the required columns (in IDs) of each upstream table. + fn extract_upstream_table_columns( + // TODO: no need to take `&mut` here + fragment: &mut StreamFragment, + ) -> HashMap> { + let mut table_columns = HashMap::new(); + + visit::visit_fragment(fragment, |node_body| { + if let NodeBody::Chain(chain_node) = node_body { + let table_id = chain_node.table_id.into(); + let column_ids = chain_node.upstream_column_ids.clone(); + table_columns + .try_insert(table_id, column_ids) + .expect("currently there should be no two same upstream tables in a fragment"); + } + }); + + assert_eq!(table_columns.len(), fragment.upstream_table_ids.len()); + + table_columns + } } impl Deref for BuildingFragment { @@ -163,7 +190,7 @@ pub(super) enum EdgeId { /// MV on MV. UpstreamExternal { /// The ID of the upstream table or materialized view. - upstream_table_id: u32, + upstream_table_id: TableId, /// The ID of the downstream fragment. downstream_fragment_id: GlobalFragmentId, }, @@ -411,17 +438,34 @@ impl CompleteStreamFragmentGraph { // Build the extra edges between the upstream `Materialize` and the downstream `Chain` of // the new materialized view. for (&id, fragment) in &graph.fragments { - for &upstream_table_id in &fragment.upstream_table_ids { + for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { let mview_fragment = upstream_mview_fragments - .get(&TableId::new(upstream_table_id)) + .get(&upstream_table_id) .context("upstream materialized view fragment not found")?; let mview_id = GlobalFragmentId::new(mview_fragment.fragment_id); - // TODO: only output the fields that are used by the downstream `Chain`. - // https://github.com/risingwavelabs/risingwave/issues/4529 - let mview_output_indices = { - let nodes = mview_fragment.actors[0].nodes.as_ref().unwrap(); - (0..nodes.fields.len() as u32).collect() + // Resolve the required output columns from the upstream materialized view. + let output_indices = { + let nodes = mview_fragment.actors[0].get_nodes().unwrap(); + let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap(); + let all_column_ids = mview_node + .get_table() + .unwrap() + .columns + .iter() + .map(|c| c.column_desc.as_ref().unwrap().column_id) + .collect_vec(); + + output_columns + .iter() + .map(|c| { + all_column_ids + .iter() + .position(|&id| id == *c) + .map(|i| i as u32) + }) + .collect::>>() + .context("column not found in the upstream materialized view")? }; let edge = StreamFragmentEdge { @@ -433,8 +477,8 @@ impl CompleteStreamFragmentGraph { // and the downstream `Chain` of the new materialized view. dispatch_strategy: DispatchStrategy { r#type: DispatcherType::NoShuffle as _, - dist_key_indices: vec![], // not used - output_indices: mview_output_indices, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices, }, }; @@ -601,6 +645,7 @@ impl CompleteStreamFragmentGraph { inner, internal_tables, table_id, + upstream_table_columns: _, } = self.get_fragment(id).into_building().unwrap(); let distribution_type = distribution.to_distribution_type() as i32; diff --git a/src/prost/Cargo.toml b/src/prost/Cargo.toml index 1b1c98d5ca28..4547a0decf2d 100644 --- a/src/prost/Cargo.toml +++ b/src/prost/Cargo.toml @@ -8,6 +8,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] +enum-as-inner = "0.5" pbjson = "0.5" prost = "0.11" prost-helpers = { path = "helpers" } diff --git a/src/prost/build.rs b/src/prost/build.rs index bdf1702f5671..8e832846eee4 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -55,6 +55,7 @@ fn main() -> Result<(), Box> { .compile_well_known_types(true) .protoc_arg("--experimental_allow_proto3_optional") .type_attribute(".", "#[derive(prost_helpers::AnyPB)]") + .type_attribute("node_body", "#[derive(::enum_as_inner::EnumAsInner)]") .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index e7b157de7d58..1bbf6a5d2bb3 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -20,7 +20,6 @@ use either::Either; use futures::stream::select_with_strategy; use futures::{pin_mut, stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; -use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::Schema; @@ -65,8 +64,8 @@ pub struct BackfillExecutor { /// Upstream with the same schema with the upstream table. upstream: BoxedExecutor, - /// The column indices need to be forwarded to the downstream. - upstream_indices: Vec, + /// The column indices need to be forwarded to the downstream from the upstream and table scan. + output_indices: Vec, progress: CreateMviewProgress, @@ -84,7 +83,7 @@ where pub fn new( table: StorageTable, upstream: BoxedExecutor, - upstream_indices: Vec, + output_indices: Vec, progress: CreateMviewProgress, schema: Schema, pk_indices: PkIndices, @@ -97,7 +96,7 @@ where }, table, upstream, - upstream_indices, + output_indices, actor_id: progress.actor_id(), progress, } @@ -109,21 +108,6 @@ where let pk_in_output_indices = self.table.pk_in_output_indices().unwrap(); let pk_order = self.table.pk_serializer().get_order_types(); - // TODO: unify these two mappings if we make the upstream and table output the same. - // The columns to be forwarded to the downstream, in the upstream columns. - let downstream_in_upstream_indices = self.upstream_indices; - // The columns to be forwarded to the downstream, in the output columns of the table scan. - let downstream_in_output_indices = downstream_in_upstream_indices - .iter() - .map(|&i| { - self.table - .output_indices() - .iter() - .position(|&j| i == j) - .unwrap() - }) - .collect_vec(); - let mut upstream = self.upstream.execute(); // Poll the upstream to get the first barrier. @@ -154,9 +138,7 @@ where // Forward messages directly to the downstream. #[for_await] for message in upstream { - if let Some(message) = - Self::mapping_message(message?, &downstream_in_upstream_indices) - { + if let Some(message) = Self::mapping_message(message?, &self.output_indices) { yield message; } } @@ -233,7 +215,7 @@ where &pk_in_output_indices, pk_order, ), - &downstream_in_upstream_indices, + &self.output_indices, )); } } @@ -272,7 +254,7 @@ where processed_rows += chunk.cardinality() as u64; yield Message::Chunk(Self::mapping_chunk( chunk, - &downstream_in_upstream_indices, + &self.output_indices, )); } @@ -295,7 +277,7 @@ where processed_rows += chunk.cardinality() as u64; yield Message::Chunk(Self::mapping_chunk( chunk, - &downstream_in_output_indices, + &self.output_indices, )); } } @@ -313,7 +295,7 @@ where // Forward messages directly to the downstream. #[for_await] for msg in upstream { - if let Some(msg) = Self::mapping_message(msg?, &downstream_in_upstream_indices) { + if let Some(msg) = Self::mapping_message(msg?, &self.output_indices) { if let Some(barrier) = msg.as_barrier() { self.progress.finish(barrier.epoch.curr); } diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index 1311e1b927df..f4081492928a 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -14,12 +14,11 @@ use futures::StreamExt; use futures_async_stream::try_stream; -use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use super::error::StreamExecutorError; use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message}; -use crate::executor::{PkIndices, Watermark}; +use crate::executor::PkIndices; use crate::task::{ActorId, CreateMviewProgress}; /// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and @@ -31,8 +30,6 @@ pub struct ChainExecutor { upstream: BoxedExecutor, - upstream_indices: Vec, - progress: CreateMviewProgress, actor_id: ActorId, @@ -43,24 +40,10 @@ pub struct ChainExecutor { upstream_only: bool, } -fn mapping_chunk(chunk: StreamChunk, upstream_indices: &[usize]) -> StreamChunk { - let (ops, columns, visibility) = chunk.into_inner(); - let mapped_columns = upstream_indices - .iter() - .map(|&i| columns[i].clone()) - .collect(); - StreamChunk::new(ops, mapped_columns, visibility) -} - -fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { - watermark.transform_with_indices(upstream_indices) -} - impl ChainExecutor { pub fn new( snapshot: BoxedExecutor, upstream: BoxedExecutor, - upstream_indices: Vec, progress: CreateMviewProgress, schema: Schema, pk_indices: PkIndices, @@ -74,7 +57,6 @@ impl ChainExecutor { }, snapshot, upstream, - upstream_indices, actor_id: progress.actor_id(), progress, upstream_only, @@ -117,21 +99,11 @@ impl ChainExecutor { // first barrier. #[for_await] for msg in upstream { - match msg? { - Message::Watermark(watermark) => { - match mapping_watermark(watermark, &self.upstream_indices) { - Some(mapped_watermark) => yield Message::Watermark(mapped_watermark), - None => continue, - } - } - Message::Chunk(chunk) => { - yield Message::Chunk(mapping_chunk(chunk, &self.upstream_indices)); - } - Message::Barrier(barrier) => { - self.progress.finish(barrier.epoch.curr); - yield Message::Barrier(barrier); - } + let msg = msg?; + if let Message::Barrier(barrier) = &msg { + self.progress.finish(barrier.epoch.curr); } + yield msg; } } } @@ -209,15 +181,7 @@ mod test { ], )); - let chain = ChainExecutor::new( - first, - second, - vec![0], - progress, - schema, - PkIndices::new(), - false, - ); + let chain = ChainExecutor::new(first, second, progress, schema, PkIndices::new(), false); let mut chain = Box::new(chain).execute(); chain.next().await; diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index c307c0d43d99..c713eb231189 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use futures::channel::{mpsc, oneshot}; use futures::stream::select_with_strategy; use futures::{stream, StreamExt}; @@ -25,7 +23,7 @@ use super::error::StreamExecutorError; use super::{ expect_first_barrier, Barrier, BoxedExecutor, Executor, ExecutorInfo, Message, MessageStream, }; -use crate::executor::{BoxedMessageStream, PkIndices, Watermark}; +use crate::executor::PkIndices; use crate::task::{ActorId, CreateMviewProgress}; /// `ChainExecutor` is an executor that enables synchronization between the existing stream and @@ -40,8 +38,6 @@ pub struct RearrangedChainExecutor { upstream: BoxedExecutor, - upstream_indices: Arc<[usize]>, - progress: CreateMviewProgress, actor_id: ActorId, @@ -49,31 +45,6 @@ pub struct RearrangedChainExecutor { info: ExecutorInfo, } -fn mapping(upstream_indices: &[usize], msg: Message) -> Option { - match msg { - Message::Watermark(watermark) => { - mapping_watermark(watermark, upstream_indices).map(Message::Watermark) - } - Message::Chunk(chunk) => { - let (ops, columns, visibility) = chunk.into_inner(); - let mapped_columns = upstream_indices - .iter() - .map(|&i| columns[i].clone()) - .collect(); - Some(Message::Chunk(StreamChunk::new( - ops, - mapped_columns, - visibility, - ))) - } - Message::Barrier(_) => Some(msg), - } -} - -fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option { - watermark.transform_with_indices(upstream_indices) -} - #[derive(Debug)] enum RearrangedMessage { RearrangedBarrier(Barrier), @@ -115,7 +86,6 @@ impl RearrangedChainExecutor { pub fn new( snapshot: BoxedExecutor, upstream: BoxedExecutor, - upstream_indices: Vec, progress: CreateMviewProgress, schema: Schema, pk_indices: PkIndices, @@ -128,7 +98,6 @@ impl RearrangedChainExecutor { }, snapshot, upstream, - upstream_indices: upstream_indices.into(), actor_id: progress.actor_id(), progress, } @@ -136,13 +105,7 @@ impl RearrangedChainExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { - // 0. Project the upstream with `upstream_indices`. - let upstream_indices = self.upstream_indices.clone(); - - let mut upstream = Box::pin(Self::mapping_stream( - self.upstream.execute(), - &upstream_indices, - )); + let mut upstream = Box::pin(self.upstream.execute()); // 1. Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; @@ -324,17 +287,6 @@ impl RearrangedChainExecutor { } } } - - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn mapping_stream(stream: BoxedMessageStream, upstream_indices: &[usize]) { - #[for_await] - for msg in stream { - match mapping(upstream_indices, msg?) { - Some(msg) => yield msg, - None => continue, - } - } - } } impl Executor for RearrangedChainExecutor { diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index 7b007534327d..6f7dcc81a01b 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -36,12 +36,6 @@ impl ExecutorBuilder for ChainExecutorBuilder { ) -> StreamResult { let [mview, snapshot]: [_; 2] = params.input.try_into().unwrap(); - let upstream_indices: Vec = node - .upstream_column_indices - .iter() - .map(|&i| i as usize) - .collect(); - // For reporting the progress. let progress = stream .context @@ -51,36 +45,36 @@ impl ExecutorBuilder for ChainExecutorBuilder { // its schema. let schema = snapshot.schema().clone(); + let output_indices = node + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + + // For `Chain`s other than `Backfill`, there should be no extra mapping required. We can + // directly output the columns received from the upstream or snapshot. + if !matches!(node.chain_type(), ChainType::Backfill) { + let all_indices = (0..schema.len()).collect_vec(); + assert_eq!(output_indices, all_indices); + } + let executor = match node.chain_type() { - ChainType::Chain => ChainExecutor::new( - snapshot, - mview, - upstream_indices, - progress, - schema, - params.pk_indices, - false, - ) - .boxed(), - ChainType::UpstreamOnly => ChainExecutor::new( - snapshot, - mview, - upstream_indices, - progress, - schema, - params.pk_indices, - true, - ) - .boxed(), - ChainType::Rearrange => RearrangedChainExecutor::new( - snapshot, - mview, - upstream_indices, - progress, - schema, - params.pk_indices, - ) - .boxed(), + ChainType::Chain | ChainType::UpstreamOnly => { + let upstream_only = matches!(node.chain_type(), ChainType::UpstreamOnly); + ChainExecutor::new( + snapshot, + mview, + progress, + schema, + params.pk_indices, + upstream_only, + ) + .boxed() + } + ChainType::Rearrange => { + RearrangedChainExecutor::new(snapshot, mview, progress, schema, params.pk_indices) + .boxed() + } ChainType::Backfill => { let table_desc: &StorageTableDesc = node.get_table_desc()?; let table_id = TableId { @@ -156,7 +150,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { BackfillExecutor::new( table, mview, - upstream_indices, + output_indices, progress, schema, params.pk_indices,