From aa7362ba1823a378418d61e196ecdf0ce11729d6 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 6 Sep 2024 17:21:52 +0800 Subject: [PATCH] add vnode count to table catalog Signed-off-by: Bugen Zhao --- proto/catalog.proto | 2 ++ proto/plan_common.proto | 1 + src/common/src/catalog/physical_table.rs | 7 +++++++ src/common/src/hash/consistent_hash/vnode.rs | 18 ++++++++++++++++++ src/frontend/src/catalog/table_catalog.rs | 11 +++++++++++ .../optimizer/plan_node/stream_materialize.rs | 2 ++ src/frontend/src/optimizer/plan_node/utils.rs | 2 ++ .../src/scheduler/distributed/query.rs | 3 ++- src/frontend/src/scheduler/plan_fragmenter.rs | 2 +- src/meta/model_v2/src/table.rs | 4 ++++ src/meta/src/controller/mod.rs | 1 + .../sink_coordination/coordinator_worker.rs | 2 +- src/storage/src/filter_key_extractor.rs | 1 + .../log_store_impl/kv_log_store/writer.rs | 2 +- .../compaction_test/src/delete_range_runner.rs | 1 + 15 files changed, 55 insertions(+), 4 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 3c3ee374a9e6e..e5dcb3943d773 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -417,6 +417,8 @@ message Table { // upstream table. optional string cdc_table_id = 39; + optional uint32 maybe_vnode_count = 40; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/plan_common.proto b/proto/plan_common.proto index bc2e60503f103..5c64f9e9e22f3 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -96,6 +96,7 @@ message StorageTableDesc { repeated uint32 stream_key = 9; optional uint32 vnode_col_idx_in_pk = 10; optional uint32 retention_seconds = 11; + optional uint32 maybe_vnode_count = 12; } message AsOf { diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 5ed66b98de5c6..cb4abba7b8775 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -21,6 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::catalog::get_dist_key_in_pk_indices; +use crate::hash::VnodeCountCompat; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -57,6 +58,8 @@ pub struct TableDesc { /// the column indices which could receive watermarks. pub watermark_columns: FixedBitSet, + pub vnode_count: usize, + /// Whether the table is versioned. If `true`, column-aware row encoding will be used /// to be compatible with schema changes. /// @@ -113,6 +116,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, + maybe_vnode_count: Some(self.vnode_count as u32), }) } @@ -126,6 +130,8 @@ impl TableDesc { } pub fn from_pb_table(table: &Table) -> Self { + let vnode_count = table.vnode_count(); + Self { table_id: TableId::new(table.id), pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), @@ -143,6 +149,7 @@ impl TableDesc { read_prefix_len_hint: table.read_prefix_len_hint as _, watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), versioned: table.version.is_some(), + vnode_count, } } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 177c104cf5f48..257c7f5db330d 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -193,6 +193,24 @@ impl VirtualNode { } } +pub trait VnodeCountCompat { + fn vnode_count(&self) -> usize; +} + +impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { + fn vnode_count(&self) -> usize { + self.maybe_vnode_count + .map_or(VirtualNode::COUNT, |v| v as _) + } +} + +impl VnodeCountCompat for risingwave_pb::catalog::Table { + fn vnode_count(&self) -> usize { + self.maybe_vnode_count + .map_or(VirtualNode::COUNT, |v| v as _) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index a97db5646fe2a..008f9c4255100 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -20,6 +20,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId, }; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; @@ -170,6 +171,8 @@ pub struct TableCatalog { pub initialized_at_cluster_version: Option, pub cdc_table_id: Option, + + pub vnode_count: usize, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -348,6 +351,7 @@ impl TableCatalog { watermark_columns: self.watermark_columns.clone(), versioned: self.version.is_some(), vnode_col_index: self.vnode_col_index, + vnode_count: self.vnode_count, } } @@ -428,6 +432,7 @@ impl TableCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), + maybe_vnode_count: Some(self.vnode_count as _), } } @@ -533,6 +538,8 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); + let vnode_count = tb.vnode_count(); + let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -602,6 +609,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, + vnode_count, } } } @@ -622,6 +630,7 @@ impl OwnedByUserCatalog for TableCatalog { mod tests { use risingwave_common::catalog::{row_id_column_desc, ColumnDesc, ColumnId}; + use risingwave_common::hash::VirtualNode; use risingwave_common::test_prelude::*; use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; @@ -692,6 +701,7 @@ mod tests { initialized_at_cluster_version: None, version_column_index: None, cdc_table_id: None, + maybe_vnode_count: None, } .into(); @@ -755,6 +765,7 @@ mod tests { dependent_relations: vec![], version_column_index: None, cdc_table_id: None, + vnode_count: VirtualNode::COUNT, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 57d4454ac254a..1b0ee06f13c7b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER, }; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -283,6 +284,7 @@ impl StreamMaterialize { created_at_cluster_version: None, retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, + vnode_count: VirtualNode::COUNT, // TODO(var-vnode): use vnode count from session config }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index bc3c223c615e6..a2f79c8766fc4 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::TableType; @@ -179,6 +180,7 @@ impl TableCatalogBuilder { created_at_cluster_version: None, retention_seconds: None, cdc_table_id: None, + vnode_count: VirtualNode::COUNT, /* TODO(var-vnode): use vnode count from session config, */ } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index b991d86eca2b7..ac48b503ea0ec 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -476,7 +476,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; + use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; @@ -597,6 +597,7 @@ pub(crate) mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, + vnode_count: VirtualNode::COUNT_FOR_TEST, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index ee07ffecf37de..6777f9373b841 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{Schema, TableDesc}; use risingwave_common::hash::table_distribution::TableDistribution; -use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{ diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 44cebfa6f70ba..0a47208ff7351 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER; +use risingwave_common::hash::VnodeCountCompat; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; @@ -133,6 +134,7 @@ pub struct Model { pub retention_seconds: Option, pub incoming_sinks: I32Array, pub cdc_table_id: Option, + pub vnode_count: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -207,6 +209,7 @@ impl From for ActiveModel { fn from(pb_table: PbTable) -> Self { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); + let vnode_count = pb_table.vnode_count(); let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet @@ -255,6 +258,7 @@ impl From for ActiveModel { retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), + vnode_count: Set(vnode_count as _), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index e22b0f20ee86e..2d8449d1921f8 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -163,6 +163,7 @@ impl From> for PbTable { created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, + maybe_vnode_count: Some(value.0.vnode_count as _), } } } diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index ebfac14f88139..9fb04dc69448b 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -20,7 +20,7 @@ use futures::future::{select, Either}; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam}; use risingwave_pb::connector_service::coordinate_request::CommitRequest; diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index baf5c4070c74a..e9326d37dcd8c 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -554,6 +554,7 @@ mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, + maybe_vnode_count: None, } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 10dbc7ff39de8..93e4c1211fe61 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 13df85bf25d97..60b71fe7d835f 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -159,6 +159,7 @@ async fn compaction_test( initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, + maybe_vnode_count: None, }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;