Skip to content

Commit

Permalink
add vnode count to table catalog
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 6, 2024
1 parent d509ed6 commit aa7362b
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 4 deletions.
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ message Table {
// upstream table.
optional string cdc_table_id = 39;

optional uint32 maybe_vnode_count = 40;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ message StorageTableDesc {
repeated uint32 stream_key = 9;
optional uint32 vnode_col_idx_in_pk = 10;
optional uint32 retention_seconds = 11;
optional uint32 maybe_vnode_count = 12;
}

message AsOf {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VnodeCountCompat;
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -57,6 +58,8 @@ pub struct TableDesc {
/// the column indices which could receive watermarks.
pub watermark_columns: FixedBitSet,

pub vnode_count: usize,

/// Whether the table is versioned. If `true`, column-aware row encoding will be used
/// to be compatible with schema changes.
///
Expand Down Expand Up @@ -113,6 +116,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: Some(self.vnode_count as u32),
})
}

Expand All @@ -126,6 +130,8 @@ impl TableDesc {
}

pub fn from_pb_table(table: &Table) -> Self {
let vnode_count = table.vnode_count();

Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
Expand All @@ -143,6 +149,7 @@ impl TableDesc {
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
versioned: table.version.is_some(),
vnode_count,
}
}
}
18 changes: 18 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ impl VirtualNode {
}
}

pub trait VnodeCountCompat {
fn vnode_count(&self) -> usize;
}

impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}

impl VnodeCountCompat for risingwave_pb::catalog::Table {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
Expand Down Expand Up @@ -170,6 +171,8 @@ pub struct TableCatalog {
pub initialized_at_cluster_version: Option<String>,

pub cdc_table_id: Option<String>,

pub vnode_count: usize,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -348,6 +351,7 @@ impl TableCatalog {
watermark_columns: self.watermark_columns.clone(),
versioned: self.version.is_some(),
vnode_col_index: self.vnode_col_index,
vnode_count: self.vnode_count,
}
}

Expand Down Expand Up @@ -428,6 +432,7 @@ impl TableCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
retention_seconds: self.retention_seconds,
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: Some(self.vnode_count as _),
}
}

Expand Down Expand Up @@ -533,6 +538,8 @@ impl From<PbTable> for TableCatalog {
OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
});
let name = tb.name.clone();
let vnode_count = tb.vnode_count();

let mut col_names = HashSet::new();
let mut col_index: HashMap<i32, usize> = HashMap::new();

Expand Down Expand Up @@ -602,6 +609,7 @@ impl From<PbTable> for TableCatalog {
.map(TableId::from)
.collect_vec(),
cdc_table_id: tb.cdc_table_id,
vnode_count,
}
}
}
Expand All @@ -622,6 +630,7 @@ impl OwnedByUserCatalog for TableCatalog {
mod tests {

use risingwave_common::catalog::{row_id_column_desc, ColumnDesc, ColumnId};
use risingwave_common::hash::VirtualNode;
use risingwave_common::test_prelude::*;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -692,6 +701,7 @@ mod tests {
initialized_at_cluster_version: None,
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: None,
}
.into();

Expand Down Expand Up @@ -755,6 +765,7 @@ mod tests {
dependent_relations: vec![],
version_column_index: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand Down Expand Up @@ -283,6 +284,7 @@ impl StreamMaterialize {
created_at_cluster_version: None,
retention_seconds: retention_seconds.map(|i| i.into()),
cdc_table_id: None,
vnode_count: VirtualNode::COUNT, // TODO(var-vnode): use vnode count from session config
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::catalog::{
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -179,6 +180,7 @@ impl TableCatalogBuilder {
created_at_cluster_version: None,
retention_seconds: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT, /* TODO(var-vnode): use vnode count from session config, */
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ pub(crate) mod tests {
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus,
DEFAULT_SUPER_USER_ID,
};
use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::types::DataType;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
Expand Down Expand Up @@ -597,6 +597,7 @@ pub(crate) mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
vnode_count: VirtualNode::COUNT_FOR_TEST,
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_common::bail;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{Schema, TableDesc};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{
Expand Down
4 changes: 4 additions & 0 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
use risingwave_common::hash::VnodeCountCompat;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType};
use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
Expand Down Expand Up @@ -133,6 +134,7 @@ pub struct Model {
pub retention_seconds: Option<i32>,
pub incoming_sinks: I32Array,
pub cdc_table_id: Option<String>,
pub vnode_count: i32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -207,6 +209,7 @@ impl From<PbTable> for ActiveModel {
fn from(pb_table: PbTable) -> Self {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
let vnode_count = pb_table.vnode_count();

let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
NotSet
Expand Down Expand Up @@ -255,6 +258,7 @@ impl From<PbTable> for ActiveModel {
retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
incoming_sinks: Set(pb_table.incoming_sinks.into()),
cdc_table_id: Set(pb_table.cdc_table_id),
vnode_count: Set(vnode_count as _),
}
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl From<ObjectModel<table::Model>> for PbTable {
created_at_cluster_version: value.1.created_at_cluster_version,
retention_seconds: value.0.retention_seconds.map(|id| id as u32),
cdc_table_id: value.0.cdc_table_id,
maybe_vnode_count: Some(value.0.vnode_count as _),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::future::{select, Either};
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam};
use risingwave_pb::connector_service::coordinate_request::CommitRequest;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
maybe_vnode_count: None,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter};
Expand Down
1 change: 1 addition & 0 deletions src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn compaction_test(
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
maybe_vnode_count: None,
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down

0 comments on commit aa7362b

Please sign in to comment.