diff --git a/proto/hummock.proto b/proto/hummock.proto index 92c1494707fb..452ef9fe3be5 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -227,15 +227,6 @@ message HummockVersionArchive { repeated HummockVersionDelta version_deltas = 2; } -// We will have two epoch after decouple -message HummockSnapshot { - // Epoch with checkpoint, we will read durable data with it. - uint64 committed_epoch = 1; - // Epoch without checkpoint, we will read real-time data with it. But it may be rolled back. - reserved 2; - reserved "current_epoch"; -} - message VersionUpdatePayload { oneof payload { HummockVersionDeltas version_deltas = 1; @@ -273,13 +264,6 @@ message GetAssignedCompactTaskNumResponse { uint32 num_tasks = 1; } -message GetEpochRequest {} - -message GetEpochResponse { - common.Status status = 1; - HummockSnapshot snapshot = 2; -} - // When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true, // it represents [left, right), of which right is exclusive. message KeyRange { @@ -811,7 +795,6 @@ service HummockManagerService { rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse); rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse); rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse); - rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse); rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse); rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse); rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse); diff --git a/src/ctl/src/cmd_impl/hummock/list_version.rs b/src/ctl/src/cmd_impl/hummock/list_version.rs index ac8ba6982316..36ef3051a890 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version.rs @@ -49,11 +49,7 @@ pub async fn list_version( println!("{:#?}", version); } else { - println!( - "Version {} max_committed_epoch {}", - version.id, - version.visible_table_committed_epoch() - ); + println!("Version {}", version.id); for (cg, levels) in &version.levels { println!("CompactionGroup {}", cg); diff --git a/src/ctl/src/cmd_impl/hummock/pause_resume.rs b/src/ctl/src/cmd_impl/hummock/pause_resume.rs index 9fdb9bc0cab3..8e40af55163c 100644 --- a/src/ctl/src/cmd_impl/hummock/pause_resume.rs +++ b/src/ctl/src/cmd_impl/hummock/pause_resume.rs @@ -22,9 +22,8 @@ pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> { let version = meta_client.disable_commit_epoch().await?; println!( "Disabled.\ - Current version: id {}, max_committed_epoch {}", + Current version: id {}", version.id, - version.visible_table_committed_epoch() ); Ok(()) } diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 51b776ad1b2c..9ce4000784ef 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -79,7 +79,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result args.use_new_object_prefix_strategy, )?) .await?; - let version = hummock.inner().get_pinned_version().version().clone(); + let version = hummock.inner().get_pinned_version().clone(); let sstable_store = hummock.sstable_store(); for level in version.get_combined_levels() { for sstable_info in &level.table_infos { diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index 3537a645c30e..b6ab7f111aaa 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -191,11 +191,8 @@ pub async fn print_version_delta_in_archive( if is_first { is_first = false; println!( - "delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}", - delta.id, - delta.prev_id, - delta.max_committed_epoch, - delta.trivial_move + "delta: id {}, prev_id {}, trivial_move {}", + delta.id, delta.prev_id, delta.trivial_move ); } println!("compaction group id {cg_id}"); diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index 3e8da33e044f..fc735104dac6 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -744,7 +744,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an .map(|vd| hummock_version_delta::ActiveModel { id: Set(vd.id.to_u64() as _), prev_id: Set(vd.prev_id.to_u64() as _), - max_committed_epoch: Set(vd.visible_table_committed_epoch() as _), + max_committed_epoch: Set(vd.max_committed_epoch_for_migration() as _), safe_epoch: Set(0 as _), trivial_move: Set(vd.trivial_move), full_version_delta: Set((&vd.to_protobuf()).into()), diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 3002a3585fb4..7517d6b9885b 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -127,7 +127,14 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore Vec Result Result; async fn flush(&self, checkpoint: bool) -> Result; @@ -139,10 +137,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.try_unregister().await; } - async fn get_snapshot(&self) -> Result { - self.0.get_snapshot().await - } - async fn flush(&self, checkpoint: bool) -> Result { self.0.flush(checkpoint).await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 731c7a486df8..ae04546f3fbb 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -49,7 +49,6 @@ use risingwave_pb::ddl_service::{ use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo, - HummockSnapshot, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -938,10 +937,6 @@ pub struct MockFrontendMetaClient {} impl FrontendMetaClient for MockFrontendMetaClient { async fn try_unregister(&self) {} - async fn get_snapshot(&self) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) - } - async fn flush(&self, _checkpoint: bool) -> RpcResult { Ok(INVALID_VERSION_ID) } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 5830951d3e5f..e7ffcfbc67c6 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -146,11 +146,7 @@ impl HummockManagerService for HummockServiceImpl { let req = request.into_inner(); let version_deltas = self .hummock_manager - .list_version_deltas( - HummockVersionId::new(req.start_id), - req.num_limit, - req.committed_epoch_limit, - ) + .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit) .await?; let resp = ListVersionDeltasResponse { version_deltas: Some(PbHummockVersionDeltas { @@ -249,17 +245,6 @@ impl HummockManagerService for HummockServiceImpl { })) } - async fn get_epoch( - &self, - _request: Request, - ) -> Result, Status> { - let hummock_snapshot = self.hummock_manager.latest_snapshot(); - Ok(Response::new(GetEpochResponse { - status: None, - snapshot: Some(hummock_snapshot), - })) - } - async fn report_full_scan_task( &self, request: Request, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 1b00c9ddb733..c1c14d2977bc 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -712,16 +712,13 @@ impl GlobalBarrierManager { } { - let latest_snapshot = self.context.hummock_manager.latest_snapshot(); - let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); - // Bootstrap recovery. Here we simply trigger a recovery process to achieve the // consistency. // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. self.context .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)); - let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); + let span = tracing::info_span!("bootstrap_recovery"); crate::telemetry::report_event( risingwave_pb::telemetry::TelemetryEventStage::Recovery, "normal_recovery", @@ -1102,12 +1099,9 @@ impl GlobalBarrierManager { .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( err.clone(), ))); - let latest_snapshot = self.context.hummock_manager.latest_snapshot(); - let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch let span = tracing::info_span!( "failure_recovery", error = %err.as_report(), - prev_epoch = prev_epoch.value().0 ); crate::telemetry::report_event( @@ -1134,12 +1128,9 @@ impl GlobalBarrierManager { self.context .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc)); - let latest_snapshot = self.context.hummock_manager.latest_snapshot(); - let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch let span = tracing::info_span!( "adhoc_recovery", error = %err.as_report(), - prev_epoch = prev_epoch.value().0 ); crate::telemetry::report_event( diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f74e6a23aa74..efe164af7797 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -326,7 +326,7 @@ impl GlobalBarrierManager { .context .hummock_manager .on_current_version(|version| { - let max_committed_epoch = version.visible_table_committed_epoch(); + let max_committed_epoch = version.max_committed_epoch_for_meta(); for (table_id, info) in version.state_table_info.info() { assert_eq!( info.committed_epoch, max_committed_epoch, diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index f586999e55f9..12419fd0993b 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -28,7 +28,6 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::hummock::compact_task::{self}; -use risingwave_pb::hummock::HummockSnapshot; use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; @@ -288,12 +287,6 @@ impl HummockManager { )?; } - if is_visible_table_committed_epoch { - let snapshot = HummockSnapshot { committed_epoch }; - let prev_snapshot = self.latest_snapshot.swap(snapshot.into()); - assert!(prev_snapshot.committed_epoch < committed_epoch); - } - for compaction_group_id in &modified_compaction_groups { trigger_sst_stat( &self.metrics, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index e0acb64c4eda..68d903bd680d 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::compact_task::ReportTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ get_compaction_group_ids, TableGroupInfo, @@ -220,10 +221,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); - let epoch = new_version_delta - .latest_version() - .visible_table_committed_epoch(); + let mut new_version_delta = version.new_delta(None); for (table_id, raw_group_id) in pairs { let mut group_id = *raw_group_id; @@ -267,7 +265,7 @@ impl HummockManager { .insert( TableId::new(*table_id), PbStateTableInfoDelta { - committed_epoch: epoch, + committed_epoch: INVALID_EPOCH, compaction_group_id: *raw_group_id, } ) @@ -295,7 +293,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); + let mut new_version_delta = version.new_delta(None); let mut modified_groups: HashMap = HashMap::new(); // Remove member tables @@ -483,7 +481,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); + let mut new_version_delta = version.new_delta(None); let new_sst_start_id = next_sstable_object_id( &self.env, diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index ce1cdfb364e7..f6d464590d02 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -167,7 +167,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); + let mut new_version_delta = version.new_delta(None); let target_compaction_group_id = { // merge right_group_id to left_group_id and remove right_group_id diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index be430dd17499..7c0c27a07921 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap HummockVersionTransaction<'a> { fn apply_compact_task(&mut self, compact_task: &CompactTask) { - let mut version_delta = self.new_delta(); + let mut version_delta = self.new_delta(None); let trivial_move = CompactStatus::is_trivial_move_task(compact_task); version_delta.trivial_move = trivial_move; @@ -1334,9 +1334,8 @@ impl HummockManager { ) -> Result<()> { self.on_current_version(|old_version| { tracing::info!( - "Trigger compaction for version {}, epoch {}, groups {:?}", + "Trigger compaction for version {}, groups {:?}", old_version.id, - old_version.visible_table_committed_epoch(), compaction_groups ); }) diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 8f1a510dc07e..7d381e5f36fb 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -216,12 +216,12 @@ impl HummockManager { } if is_visible_table_committed_epoch - && committed_epoch <= current_version.visible_table_committed_epoch() + && committed_epoch <= current_version.max_committed_epoch_for_meta() { return Err(anyhow::anyhow!( "Epoch {} <= max_committed_epoch {}", committed_epoch, - current_version.visible_table_committed_epoch() + current_version.max_committed_epoch_for_meta() ) .into()); } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4cf29ca060e1..98a0e84694e6 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -17,12 +17,10 @@ use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use arc_swap::ArcSwap; use bytes::Bytes; use itertools::Itertools; use risingwave_common::monitor::MonitoredRwLock; use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId, @@ -33,8 +31,8 @@ use risingwave_meta_model_v2::{ hummock_version_stats, }; use risingwave_pb::hummock::{ - HummockPinnedVersion, HummockSnapshot, HummockVersionStats, PbCompactTaskAssignment, - PbCompactionGroupInfo, SubscribeCompactionEventRequest, + HummockPinnedVersion, HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo, + SubscribeCompactionEventRequest, }; use risingwave_pb::meta::subscribe_response::Operation; use tokio::sync::mpsc::UnboundedSender; @@ -71,8 +69,6 @@ use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; pub(crate) use utils::*; -type Snapshot = ArcSwap; - // Update to states are performed as follow: // - Initialize ValTransaction for the meta state to update // - Make changes on the ValTransaction. @@ -90,7 +86,6 @@ pub struct HummockManager { /// `CompactionGroupManager` manages compaction configs for compaction groups. compaction_group_manager: MonitoredRwLock, context_info: MonitoredRwLock, - latest_snapshot: Snapshot, pub metrics: Arc, @@ -273,9 +268,6 @@ impl HummockManager { metadata_manager, // compaction_request_channel: parking_lot::RwLock::new(None), compactor_manager, - latest_snapshot: ArcSwap::from_pointee(HummockSnapshot { - committed_epoch: INVALID_EPOCH, - }), event_sender: tx, delete_object_tracker: Default::default(), object_store, @@ -427,12 +419,6 @@ impl HummockManager { ..Default::default() }); - self.latest_snapshot.store( - HummockSnapshot { - committed_epoch: redo_state.visible_table_committed_epoch(), - } - .into(), - ); versioning_guard.current_version = redo_state; versioning_guard.hummock_version_deltas = hummock_version_deltas; diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 9fa9b11a026c..6fc6ba8110b0 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -270,7 +270,7 @@ async fn test_hummock_transaction() { // Get tables before committing epoch1. No tables should be returned. let current_version = hummock_manager.get_current_version().await; assert_eq!( - current_version.visible_table_committed_epoch(), + current_version.max_committed_epoch_for_test(), INVALID_EPOCH ); let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); @@ -293,7 +293,7 @@ async fn test_hummock_transaction() { // Get tables after committing epoch1. All tables committed in epoch1 should be returned let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.visible_table_committed_epoch(), epoch1); + assert_eq!(current_version.max_committed_epoch_for_test(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -317,7 +317,7 @@ async fn test_hummock_transaction() { // tables_in_epoch2 should be invisible. let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.visible_table_committed_epoch(), epoch1); + assert_eq!(current_version.max_committed_epoch_for_test(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -341,7 +341,7 @@ async fn test_hummock_transaction() { // returned let current_version = hummock_manager.get_current_version().await; let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - assert_eq!(current_version.visible_table_committed_epoch(), epoch2); + assert_eq!(current_version.max_committed_epoch_for_test(), epoch2); assert_eq!( get_sorted_object_ids(&committed_tables), get_sorted_committed_object_ids(¤t_version, compaction_group_id) @@ -621,9 +621,11 @@ async fn test_pin_snapshot_response_lost() { // Pin a snapshot with smallest last_pin // [ e0 ] -> [ e0:pinned ] - let mut epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); + let mut epoch_recorded_in_frontend = hummock_manager + .on_current_version(|version| version.max_committed_epoch_for_test()) + .await; let prev_epoch = epoch.prev_epoch(); - assert_eq!(epoch_recorded_in_frontend.committed_epoch, prev_epoch); + assert_eq!(epoch_recorded_in_frontend, prev_epoch); let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( @@ -648,17 +650,18 @@ async fn test_pin_snapshot_response_lost() { // Assume the response of the previous rpc is lost. // [ e0:pinned, e1 ] -> [ e0, e1:pinned ] - epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); + epoch_recorded_in_frontend = hummock_manager + .on_current_version(|version| version.max_committed_epoch_for_test()) + .await; let prev_epoch = epoch.prev_epoch(); - assert_eq!(epoch_recorded_in_frontend.committed_epoch, prev_epoch); + assert_eq!(epoch_recorded_in_frontend, prev_epoch); // Assume the response of the previous rpc is lost. // [ e0, e1:pinned ] -> [ e0, e1:pinned ] - epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); - assert_eq!( - epoch_recorded_in_frontend.committed_epoch, - epoch.prev_epoch() - ); + epoch_recorded_in_frontend = hummock_manager + .on_current_version(|version| version.max_committed_epoch_for_test()) + .await; + assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( @@ -683,11 +686,10 @@ async fn test_pin_snapshot_response_lost() { // Use correct snapshot id. // [ e0, e1:pinned, e2 ] -> [ e0, e1:pinned, e2:pinned ] - epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); - assert_eq!( - epoch_recorded_in_frontend.committed_epoch, - epoch.prev_epoch() - ); + epoch_recorded_in_frontend = hummock_manager + .on_current_version(|version| version.max_committed_epoch_for_test()) + .await; + assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( @@ -712,11 +714,10 @@ async fn test_pin_snapshot_response_lost() { // Use u64::MAX as epoch to pin greatest snapshot // [ e0, e1:pinned, e2:pinned, e3 ] -> [ e0, e1:pinned, e2:pinned, e3::pinned ] - epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); - assert_eq!( - epoch_recorded_in_frontend.committed_epoch, - epoch.prev_epoch() - ); + epoch_recorded_in_frontend = hummock_manager + .on_current_version(|version| version.max_committed_epoch_for_test()) + .await; + assert_eq!(epoch_recorded_in_frontend, epoch.prev_epoch()); } #[tokio::test] @@ -1196,7 +1197,7 @@ async fn test_extend_objects_to_delete() { ); let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); - let new_epoch = pinned_version2.visible_table_committed_epoch().next_epoch(); + let new_epoch = pinned_version2.max_committed_epoch_for_test().next_epoch(); hummock_meta_client .commit_epoch( new_epoch, @@ -1209,7 +1210,7 @@ async fn test_extend_objects_to_delete() { .await .unwrap(); let pinned_version3: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - assert_eq!(new_epoch, pinned_version3.visible_table_committed_epoch()); + assert_eq!(new_epoch, pinned_version3.max_committed_epoch_for_test()); hummock_manager .unpin_version_before(context_id, pinned_version3.id) .await diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index cca4ec727e78..74c2c45bff3e 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -45,7 +45,7 @@ fn trigger_delta_log_stats(metrics: &MetaMetrics, total_number: usize) { fn trigger_version_stat(metrics: &MetaMetrics, current_version: &HummockVersion) { metrics .max_committed_epoch - .set(current_version.visible_table_committed_epoch() as i64); + .set(current_version.max_committed_epoch_for_meta() as i64); metrics .version_size .set(current_version.estimated_encode_len() as i64); @@ -97,8 +97,13 @@ impl<'a> HummockVersionTransaction<'a> { } } - pub(super) fn new_delta<'b>(&'b mut self) -> SingleDeltaTransaction<'a, 'b> { - let delta = self.latest_version().version_delta_after(); + pub(super) fn new_delta<'b>( + &'b mut self, + max_committed_epoch: Option, + ) -> SingleDeltaTransaction<'a, 'b> { + let delta = self + .latest_version() + .version_delta_after(max_committed_epoch); SingleDeltaTransaction { version_txn: self, delta: Some(delta), @@ -125,10 +130,12 @@ impl<'a> HummockVersionTransaction<'a> { new_table_watermarks: HashMap, change_log_delta: HashMap, ) -> HummockVersionDelta { - let mut new_version_delta = self.new_delta(); - if is_visible_table_committed_epoch { - new_version_delta.set_max_committed_epoch(committed_epoch); - } + let new_max_committed_epoch = if is_visible_table_committed_epoch { + Some(committed_epoch) + } else { + None + }; + let mut new_version_delta = self.new_delta(new_max_committed_epoch); new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 3a9ae5b098ae..e396a0123b9b 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -25,14 +25,11 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, - HummockVersionId, + CompactionGroupId, HummockContextId, HummockSstableId, HummockSstableObjectId, HummockVersionId, }; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; -use risingwave_pb::hummock::{ - HummockPinnedVersion, HummockSnapshot, HummockVersionStats, TableStats, -}; +use risingwave_pb::hummock::{HummockPinnedVersion, HummockVersionStats, TableStats}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use super::check_cg_write_limit; @@ -165,14 +162,12 @@ impl HummockManager { &self, start_id: HummockVersionId, num_limit: u32, - committed_epoch_limit: HummockEpoch, ) -> Result> { let versioning = self.versioning.read().await; let version_deltas = versioning .hummock_version_deltas .range(start_id..) .map(|(_id, delta)| delta) - .filter(|delta| delta.visible_table_committed_epoch() <= committed_epoch_limit) .take(num_limit as _) .cloned() .collect(); @@ -260,7 +255,7 @@ impl HummockManager { self.env.notification_manager(), &self.metrics, ); - let mut new_version_delta = version.new_delta(); + let mut new_version_delta = version.new_delta(None); new_version_delta.with_latest_version(|version, delta| { version.may_fill_backward_compatible_state_table_info_delta(delta) }); @@ -270,11 +265,6 @@ impl HummockManager { Ok(()) } - pub fn latest_snapshot(&self) -> HummockSnapshot { - let snapshot = self.latest_snapshot.load(); - HummockSnapshot::clone(&snapshot) - } - pub async fn list_change_log_epochs( &self, table_id: u32, diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index c59fb512090f..3163d283692f 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -38,7 +38,7 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ - compact_task, HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, + compact_task, PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, }; use risingwave_rpc_client::error::{Result, RpcError}; @@ -119,10 +119,6 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(self.hummock_manager.get_current_version().await) } - async fn get_snapshot(&self) -> Result { - Ok(self.hummock_manager.latest_snapshot()) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { fail_point!("get_new_sst_ids_err", |_| Err(anyhow!( "failpoint get_new_sst_ids_err" diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index 21f51867d5eb..b66672a2057c 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -223,7 +223,7 @@ impl Transactional for HummockVersionDelta { let m = hummock_version_delta::ActiveModel { id: Set(self.id.to_u64().try_into().unwrap()), prev_id: Set(self.prev_id.to_u64().try_into().unwrap()), - max_committed_epoch: Set(self.visible_table_committed_epoch().try_into().unwrap()), + max_committed_epoch: Set(0.into()), safe_epoch: Set(0.into()), trivial_move: Set(self.trivial_move), full_version_delta: Set(FullVersionDelta::from(&self.into())), diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 5239d000353c..85e40e171fc2 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -19,8 +19,7 @@ use risingwave_hummock_sdk::{ HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::hummock::{ - HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, - SubscribeCompactionEventResponse, VacuumTask, + PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, }; use tokio::sync::mpsc::UnboundedSender; @@ -32,7 +31,6 @@ use crate::error::Result; pub trait HummockMetaClient: Send + Sync + 'static { async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()>; async fn get_current_version(&self) -> Result; - async fn get_snapshot(&self) -> Result; async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. async fn commit_epoch( diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2648843abe19..5608738d6ae5 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1509,12 +1509,6 @@ impl HummockMetaClient for MetaClient { )) } - async fn get_snapshot(&self) -> Result { - let req = GetEpochRequest {}; - let resp = self.inner.get_epoch(req).await?; - Ok(resp.snapshot.unwrap()) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { let resp = self .inner @@ -2106,7 +2100,6 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse } ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse } ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse } - ,{ hummock_client, get_epoch, GetEpochRequest, GetEpochResponse } ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse } ,{ hummock_client, report_vacuum_task, ReportVacuumTaskRequest, ReportVacuumTaskResponse } ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse } diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index e2538dbe44a8..18d619c1e504 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -73,7 +73,7 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_object_ids(), - max_committed_epoch: v.visible_table_committed_epoch(), + max_committed_epoch: v.max_committed_epoch_for_meta(), format_version, remarks, state_table_info: v diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index e7dbc92eae23..6afb90b1258d 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -127,9 +127,9 @@ impl Display for MetadataV2 { writeln!(f, "clusters: {:#?}", self.clusters)?; writeln!( f, - "Hummock version: id {}, max_committed_epoch: {}", + "Hummock version: id {}, committed_epoch: {:?}", self.hummock_version.id, - self.hummock_version.visible_table_committed_epoch() + self.hummock_version.state_table_info.info(), )?; // optionally dump other metadata Ok(()) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index dc0b08d45299..5ea75ec87897 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -544,9 +544,7 @@ impl HummockVersion { &version_delta.removed_table_ids, ); - if !is_commit_epoch - && self.visible_table_committed_epoch() < version_delta.visible_table_committed_epoch() - { + if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch { is_commit_epoch = true; tracing::trace!("max committed epoch bumped but no table committed epoch is changed"); } @@ -594,17 +592,17 @@ impl HummockVersion { ); self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) } - let visible_table_committed_epoch = self.visible_table_committed_epoch(); + let max_committed_epoch = self.max_committed_epoch; let group_destroy = summary.group_destroy; let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { panic!("compaction group {} does not exist", compaction_group_id) }); assert!( - visible_table_committed_epoch <= version_delta.visible_table_committed_epoch(), + max_committed_epoch <= version_delta.max_committed_epoch, "new max commit epoch {} is older than the current max commit epoch {}", - version_delta.visible_table_committed_epoch(), - visible_table_committed_epoch + version_delta.max_committed_epoch, + max_committed_epoch ); if is_commit_epoch { // `max_committed_epoch` increases. It must be a `commit_epoch` @@ -655,7 +653,7 @@ impl HummockVersion { } } self.id = version_delta.id; - self.set_max_committed_epoch(version_delta.visible_table_committed_epoch()); + self.max_committed_epoch = version_delta.max_committed_epoch; // apply to table watermark diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index bddf876240bc..e2d5a675f3e9 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -726,6 +726,7 @@ mod tests { use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; + use risingwave_pb::hummock::PbHummockVersion; use crate::key::{is_empty_key_range, prefixed_range_with_vnode, TableKeyRange}; use crate::table_watermark::{ @@ -1183,8 +1184,10 @@ mod tests { watermark3.clone(), ); - let mut version = HummockVersion::default(); - version.set_max_committed_epoch(EPOCH1); + let mut version = HummockVersion::from_rpc_protobuf(&PbHummockVersion { + max_committed_epoch: EPOCH1, + ..Default::default() + }); let test_table_id = TableId::from(233); version.table_watermarks.insert( test_table_id, diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 8859daa572d9..a329eb80a9d8 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -99,7 +99,7 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV } }) .collect(), - max_committed_epoch: version.visible_table_committed_epoch(), + max_committed_epoch: version.max_committed_epoch, table_watermarks: version.table_watermarks.clone(), // TODO: optimization: strip table change log based on select_group table_change_log: version @@ -142,7 +142,7 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum } }) .collect(), - max_committed_epoch: delta.visible_table_committed_epoch(), + max_committed_epoch: delta.max_committed_epoch, trivial_move: delta.trivial_move, new_table_watermarks: delta.new_table_watermarks.clone(), removed_table_ids: delta.removed_table_ids.clone(), diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 1c627285f934..46807a5138d0 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -413,12 +413,12 @@ impl HummockVersion { } } - pub(crate) fn set_max_committed_epoch(&mut self, max_committed_epoch: u64) { - self.max_committed_epoch = max_committed_epoch; + pub fn max_committed_epoch_for_meta(&self) -> u64 { + self.max_committed_epoch } #[cfg(any(test, feature = "test"))] - pub fn max_committed_epoch(&self) -> u64 { + pub fn max_committed_epoch_for_test(&self) -> u64 { self.max_committed_epoch } @@ -429,10 +429,6 @@ impl HummockVersion { .map(|info| info.committed_epoch) } - pub fn visible_table_committed_epoch(&self) -> u64 { - self.max_committed_epoch - } - pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { let mut init_version = HummockVersion { id: FIRST_VERSION_ID, @@ -454,12 +450,19 @@ impl HummockVersion { init_version } - pub fn version_delta_after(&self) -> HummockVersionDelta { + pub fn version_delta_after(&self, max_committed_epoch: Option) -> HummockVersionDelta { + let max_committed_epoch = max_committed_epoch.unwrap_or(self.max_committed_epoch); + assert!( + max_committed_epoch >= self.max_committed_epoch, + "new max_committed_epoch {} less than prev max_committed_epoch: {}", + max_committed_epoch, + self.max_committed_epoch + ); HummockVersionDelta { id: self.next_version_id(), prev_id: self.id, trivial_move: false, - max_committed_epoch: self.max_committed_epoch, + max_committed_epoch, group_deltas: Default::default(), new_table_watermarks: HashMap::new(), removed_table_ids: HashSet::new(), @@ -597,13 +600,9 @@ impl HummockVersionDelta { })) } - pub fn visible_table_committed_epoch(&self) -> u64 { + pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch { self.max_committed_epoch } - - pub fn set_max_committed_epoch(&mut self, max_committed_epoch: u64) { - self.max_committed_epoch = max_committed_epoch; - } } impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 86f578d55bb3..86a5d2545aa8 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -811,7 +811,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch(); + .max_committed_epoch_for_test(); // assert compact_task assert_eq!( @@ -1017,7 +1017,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch(); + .max_committed_epoch_for_test(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); @@ -2004,7 +2004,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch(); + .max_committed_epoch_for_test(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); @@ -2245,7 +2245,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = hummock_manager_ref .get_current_version() .await - .max_committed_epoch(); + .max_committed_epoch_for_test(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 9fae89b520bc..0e49b9de872e 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -335,8 +335,11 @@ async fn test_read_filter_basic() { assert_eq!(1, hummock_read_snapshot.0.len()); assert_eq!(0, hummock_read_snapshot.1.len()); assert_eq!( - read_version.read().committed().max_committed_epoch(), - hummock_read_snapshot.2.max_committed_epoch() + read_version + .read() + .committed() + .max_committed_epoch_for_test(), + hummock_read_snapshot.2.max_committed_epoch_for_test() ); } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c477693b114f..9bd257c36b9a 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -471,7 +471,10 @@ async fn test_state_store_sync() { let read_version = hummock_storage.read_version(); - let base_epoch = read_version.read().committed().max_committed_epoch(); + let base_epoch = read_version + .read() + .committed() + .max_committed_epoch_for_test(); let epoch1 = test_epoch(base_epoch.next_epoch()); test_env .storage @@ -842,7 +845,7 @@ async fn test_delete_get() { .read_version() .read() .committed() - .max_committed_epoch(); + .max_committed_epoch_for_test(); let epoch1 = initial_epoch.next_epoch(); test_env @@ -944,7 +947,7 @@ async fn test_multiple_epoch_sync() { .read_version() .read() .committed() - .max_committed_epoch(); + .max_committed_epoch_for_test(); let epoch1 = initial_epoch.next_epoch(); test_env @@ -1655,8 +1658,8 @@ async fn test_hummock_version_reader() { .read_version() .read() .committed() - .max_committed_epoch(), - read_snapshot.2.max_committed_epoch() + .max_committed_epoch_for_test(), + read_snapshot.2.max_committed_epoch_for_test() ); let iter = hummock_version_reader @@ -2406,7 +2409,6 @@ async fn test_table_watermark() { let check_version_table_watermark = |version: PinnedVersion| { let epoch = version - .version() .state_table_info .info() .get(&TEST_TABLE_ID) @@ -2414,12 +2416,11 @@ async fn test_table_watermark() { .committed_epoch; let table_watermarks = TableWatermarksIndex::new_committed( version - .version() .table_watermarks .get(&TEST_TABLE_ID) .unwrap() .clone(), - version.max_committed_epoch(), + version.max_committed_epoch_for_test(), ); assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); assert_eq!( @@ -2578,7 +2579,7 @@ async fn test_commit_multi_epoch() { .manager .get_current_version() .await - .max_committed_epoch(); + .max_committed_epoch_for_test(); let commit_epoch = |epoch, sst: SstableInfo, @@ -2654,7 +2655,7 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch1.object_id); - assert_eq!(version.visible_table_committed_epoch(), epoch1); + assert_eq!(version.max_committed_epoch_for_test(), epoch1); let info = version .state_table_info @@ -2707,7 +2708,7 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level.table_infos.len(), 1); assert_eq!(sub_level.table_infos[0].object_id, sst1_epoch2.object_id); - assert_eq!(version.visible_table_committed_epoch(), epoch2); + assert_eq!(version.max_committed_epoch_for_test(), epoch2); let info = version .state_table_info @@ -2758,7 +2759,7 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level1.table_infos.len(), 1); assert_eq!(sub_level1.table_infos[0].object_id, sst2_epoch1.object_id); - assert_eq!(version.visible_table_committed_epoch(), epoch2); + assert_eq!(version.max_committed_epoch_for_test(), epoch2); let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch1); @@ -2800,7 +2801,7 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level2.table_infos.len(), 1); assert_eq!(sub_level2.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.visible_table_committed_epoch(), epoch2); + assert_eq!(version.max_committed_epoch_for_test(), epoch2); let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch2); @@ -2864,7 +2865,7 @@ async fn test_commit_multi_epoch() { assert_eq!(sub_level3.table_infos.len(), 1); assert_eq!(sub_level3.table_infos[0].object_id, sst2_epoch2.object_id); - assert_eq!(version.visible_table_committed_epoch(), epoch3); + assert_eq!(version.max_committed_epoch_for_test(), epoch3); let info = version.state_table_info.info().get(&new_table_id).unwrap(); assert_eq!(info.committed_epoch, epoch3); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index a6cbca021ce5..ab38ccf33fb5 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -425,7 +425,7 @@ async fn test_state_store_sync_v2() { let mut epoch = hummock_storage .get_pinned_version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); // ingest 16B batch @@ -1040,7 +1040,9 @@ async fn test_reload_storage() { async fn test_delete_get_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); + let initial_epoch = hummock_storage + .get_pinned_version() + .max_committed_epoch_for_test(); let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); let batch1 = vec![ @@ -1130,7 +1132,9 @@ async fn test_delete_get_v2() { async fn test_multiple_epoch_sync_v2() { let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); + let initial_epoch = hummock_storage + .get_pinned_version() + .max_committed_epoch_for_test(); let epoch1 = initial_epoch.next_epoch(); let batch1 = vec![ ( @@ -1306,7 +1310,9 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await; let table_id_set = HashSet::from_iter([local_hummock_storage.table_id()]); - let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); + let initial_epoch = hummock_storage + .get_pinned_version() + .max_committed_epoch_for_test(); let epoch1 = initial_epoch.next_epoch(); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local_hummock_storage.init_for_test(epoch1).await.unwrap(); @@ -1350,7 +1356,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .global_watermark_object_id(), HummockSstableObjectId::MAX ); - let min_object_id = |sync_result: &SyncResult| { + let _min_object_id = |sync_result: &SyncResult| { sync_result .uncommitted_ssts .iter() @@ -1363,24 +1369,12 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .seal_and_sync_epoch(epoch1, table_id_set.clone()) .await .unwrap(); - let min_object_id_epoch1 = min_object_id(&sync_result1); - assert_eq!( - hummock_storage - .sstable_object_id_manager() - .global_watermark_object_id(), - min_object_id_epoch1, - ); - let sync_result2 = hummock_storage + + let _sync_result2 = hummock_storage .seal_and_sync_epoch(epoch2, table_id_set) .await .unwrap(); - let min_object_id_epoch2 = min_object_id(&sync_result2); - assert_eq!( - hummock_storage - .sstable_object_id_manager() - .global_watermark_object_id(), - min_object_id_epoch1, - ); + meta_client .commit_epoch(epoch1, sync_result1, false) .await @@ -1393,25 +1387,11 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .await .unwrap(); - assert_eq!( - hummock_storage - .sstable_object_id_manager() - .global_watermark_object_id(), - min_object_id_epoch2, - ); - drop(local_hummock_storage); hummock_storage .clear_shared_buffer(hummock_storage.get_pinned_version().id()) .await; - - assert_eq!( - hummock_storage - .sstable_object_id_manager() - .global_watermark_object_id(), - HummockSstableObjectId::MAX - ); } /// Test the following behaviours: @@ -1425,7 +1405,7 @@ async fn test_replicated_local_hummock_storage() { let epoch0 = meta_client .hummock_manager_ref() - .on_current_version(|version| version.visible_table_committed_epoch()) + .on_current_version(|version| version.max_committed_epoch_for_test()) .await; let epoch0 = epoch0.next_epoch(); diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index e8c8fdb3aa83..7fa2ee9dd905 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -195,9 +195,6 @@ impl BackupReader { .snapshot_metadata .iter() .find(|v| { - if v.state_table_info.is_empty() { - return epoch == v.max_committed_epoch; - } if let Some(m) = v.state_table_info.get(&table_id) { return epoch == m.committed_epoch; } diff --git a/src/storage/src/hummock/conflict_detector.rs b/src/storage/src/hummock/conflict_detector.rs deleted file mode 100644 index be3d2d112e01..000000000000 --- a/src/storage/src/hummock/conflict_detector.rs +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! This mod implements a `ConflictDetector` that detect write key conflict in each epoch -use std::collections::HashSet; -use std::sync::Arc; - -use bytes::Bytes; -use crossbeam::atomic::AtomicCell; -use dashmap::DashMap; - -use crate::hummock::value::HummockValue; -use crate::hummock::HummockEpoch; -use crate::opts::StorageOpts; - -pub struct ConflictDetector { - // epoch -> key-sets - epoch_history: DashMap>>, - epoch_watermark: AtomicCell, -} - -impl Default for ConflictDetector { - fn default() -> Self { - Self { - epoch_history: DashMap::new(), - epoch_watermark: AtomicCell::new(HummockEpoch::MIN), - } - } -} - -impl ConflictDetector { - pub fn new_from_config(options: &StorageOpts) -> Option> { - if options.write_conflict_detection_enabled { - Some(Arc::new(ConflictDetector::default())) - } else { - None - } - } - - pub fn get_epoch_watermark(&self) -> HummockEpoch { - self.epoch_watermark.load() - } - - // Sets the new watermark with CAS to enable detection in concurrent update - pub fn set_watermark(&self, epoch: HummockEpoch) { - loop { - let current_watermark = self.get_epoch_watermark(); - if epoch <= current_watermark { - return; - } - if self - .epoch_watermark - .compare_exchange(current_watermark, epoch) - .is_ok() - { - self.epoch_history.retain(|x, value| { - if x <= &(epoch) { - assert!(value.is_none(), "epoch is not sync : {}", x); - } - x > &(epoch) - }); - return; - } - } - } - - /// Checks whether there is key conflict for the given `kv_pairs` and adds the key in `kv_pairs` - /// to the tracking history. Besides, whether the `epoch` has been archived will also be checked - /// to avoid writing to a stale epoch - pub fn check_conflict_and_track_write_batch( - &self, - kv_pairs: &[(Bytes, HummockValue)], - epoch: HummockEpoch, - ) { - assert!( - epoch > self.get_epoch_watermark(), - "write to an archived epoch: {}", - epoch - ); - - let mut written_key = self - .epoch_history - .entry(epoch) - .or_insert(Some(HashSet::new())); - - for (key, value) in kv_pairs { - assert!( - written_key - .as_mut() - .unwrap_or_else(|| panic!("write to an archived epoch: {}", epoch)) - .insert(key.clone()), - "key {:?} is written again after previously written, value is {:?}", - key, - value, - ); - } - } - - /// Archives an epoch. An archived epoch cannot be written anymore. - pub fn archive_epoch(&self, epochs: Vec) { - assert!( - epochs.first().gt(&Some(&self.get_epoch_watermark())), - "write to an archived epoch: {:?} , current_epoch :{}", - epochs, - self.get_epoch_watermark(), - ); - epochs.into_iter().for_each(|epoch| { - if let Some(written_key) = self.epoch_history.get(&epoch) { - assert!( - written_key.is_some(), - "epoch has been archived: epoch is {}", - epoch - ); - } - self.epoch_history.insert(epoch, None); - }) - } -} - -#[cfg(test)] -mod test { - use std::iter::once; - - use bytes::Bytes; - use itertools::Itertools; - - use crate::hummock::conflict_detector::ConflictDetector; - use crate::hummock::value::HummockValue; - - #[test] - #[should_panic] - fn test_write_conflict_in_one_batch() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - (0..2) - .map(|_| (Bytes::from("conflicted-key"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - } - - #[test] - #[should_panic] - fn test_write_conflict_in_multi_batch() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("conflicted-key"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("conflicted-key"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - } - - #[test] - fn test_valid_write_in_multi_batch() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key2"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - detector.archive_epoch(vec![233]); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 234, - ); - } - - #[test] - #[should_panic] - fn test_write_to_archived_epoch() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - detector.archive_epoch(vec![233]); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - } - - #[test] - fn test_clear_key_after_epoch_archive() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - assert!(detector.epoch_history.get(&233).unwrap().is_some()); - detector.archive_epoch(vec![233]); - assert!(detector.epoch_history.get(&233).unwrap().is_none()); - detector.set_watermark(233); - assert!(detector.epoch_history.get(&233).is_none()); - } - - #[test] - #[should_panic] - fn test_write_below_epoch_watermark() { - let detector = ConflictDetector::default(); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 233, - ); - detector.set_watermark(233); - detector.check_conflict_and_track_write_batch( - once((Bytes::from("key1"), HummockValue::Delete)) - .collect_vec() - .as_slice(), - 232, - ); - } -} diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 80e16dba1dd4..0f8143faba50 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -29,7 +29,6 @@ use prometheus::{Histogram, IntGauge}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, SyncResult}; -use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -40,10 +39,9 @@ use super::refiller::{CacheRefillConfig, CacheRefiller}; use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; -use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ - HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, + HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput, }; use crate::hummock::event_handler::{ HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping, @@ -54,9 +52,7 @@ use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::store::version::{ HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate, }; -use crate::hummock::{ - HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId, -}; +use crate::hummock::{HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef}; use crate::mem_table::ImmutableMemtable; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; @@ -199,32 +195,21 @@ pub struct HummockEventHandler { version_update_notifier_tx: Arc>, recent_versions: Arc>, - write_conflict_detector: Option>, uploader: HummockUploader, refiller: CacheRefiller, last_instance_id: LocalInstanceId, - sstable_object_id_manager: Option>, metrics: HummockEventHandlerMetrics, } async fn flush_imms( payload: Vec, - task_info: UploadTaskInfo, compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, sstable_object_id_manager: Arc, ) -> HummockResult { - for epoch in &task_info.epochs { - let _ = sstable_object_id_manager - .add_watermark_object_id(Some(*epoch)) - .await - .inspect_err(|e| { - error!(epoch, error = %e.as_report(), "unable to set watermark sst id"); - }); - } compact( compactor_context, sstable_object_id_manager, @@ -245,13 +230,11 @@ impl HummockEventHandler { state_store_metrics: Arc, ) -> Self { let upload_compactor_context = compactor_context.clone(); - let cloned_sstable_object_id_manager = sstable_object_id_manager.clone(); let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone(); let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone(); Self::new_inner( version_update_rx, pinned_version, - Some(sstable_object_id_manager), compactor_context.sstable_store.clone(), state_store_metrics, &compactor_context.storage_opts, @@ -269,7 +252,7 @@ impl HummockEventHandler { let wait_poll_latency = wait_poll_latency.clone(); let upload_compactor_context = upload_compactor_context.clone(); let filter_key_extractor_manager = filter_key_extractor_manager.clone(); - let sstable_object_id_manager = cloned_sstable_object_id_manager.clone(); + let sstable_object_id_manager = sstable_object_id_manager.clone(); spawn({ let future = async move { let _timer = upload_task_latency.start_timer(); @@ -278,7 +261,6 @@ impl HummockEventHandler { .into_values() .flat_map(|imms| imms.into_iter()) .collect(), - task_info, upload_compactor_context.clone(), filter_key_extractor_manager.clone(), sstable_object_id_manager.clone(), @@ -307,7 +289,6 @@ impl HummockEventHandler { fn new_inner( version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, - sstable_object_id_manager: Option>, sstable_store: SstableStoreRef, state_store_metrics: Arc, storage_opts: &StorageOpts, @@ -323,7 +304,6 @@ impl HummockEventHandler { storage_opts, state_store_metrics.uploader_uploading_task_size.clone(), ); - let write_conflict_detector = ConflictDetector::new_from_config(storage_opts); let metrics = HummockEventHandlerMetrics { event_handler_on_upload_finish_latency: state_store_metrics @@ -359,13 +339,11 @@ impl HummockEventHandler { pinned_version, storage_opts.max_cached_recent_versions_number, ))), - write_conflict_detector, read_version_mapping, local_read_version_mapping: Default::default(), uploader, refiller, last_instance_id: 0, - sstable_object_id_manager, metrics, } } @@ -497,7 +475,7 @@ impl HummockEventHandler { let current_version = self.uploader.hummock_version(); - if current_version.version().id < version_id { + if current_version.id < version_id { let mut latest_version = if let Some(CacheRefillerEvent { pinned_version, new_pinned_version, @@ -507,16 +485,11 @@ impl HummockEventHandler { current_version.id(), pinned_version.id(), "refiller earliest version {:?} not equal to current version {:?}", - pinned_version.version(), - current_version.version() + *pinned_version, + **current_version ); - info!( - ?version_id, - current_mce = current_version.visible_table_committed_epoch(), - refiller_mce = new_pinned_version.visible_table_committed_epoch(), - "refiller is clear in recovery" - ); + info!(?version_id, "refiller is clear in recovery"); Some(new_pinned_version) } else { @@ -524,7 +497,7 @@ impl HummockEventHandler { }; while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) - && latest_version_ref.version().id < version_id + && latest_version_ref.id < version_id { let version_update = self .version_update_rx @@ -561,11 +534,6 @@ impl HummockEventHandler { .collect_vec() ); - if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { - sstable_object_id_manager - .remove_watermark_object_id(TrackerId::Epoch(HummockEpoch::MAX)); - } - // Notify completion of the Clear event. let _ = notifier.send(()).inspect_err(|e| { error!("failed to notify completion of clear event: {:?}", e); @@ -603,7 +571,7 @@ impl HummockEventHandler { ) -> Option { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { - let mut version_to_apply = pinned_version.version().clone(); + let mut version_to_apply = (*pinned_version).clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if let Some(sst_delta_infos) = &mut sst_delta_infos { @@ -647,8 +615,6 @@ impl HummockEventHandler { ); } - let max_committed_epoch = new_pinned_version.visible_table_committed_epoch(); - self.version_update_notifier_tx.send_if_modified(|state| { assert_eq!(pinned_version.id(), state.id()); if state.id() == new_pinned_version.id() { @@ -659,25 +625,7 @@ impl HummockEventHandler { true }); - if let Some(conflict_detector) = self.write_conflict_detector.as_ref() { - conflict_detector.set_watermark(max_committed_epoch); - } - - // TODO: should we change the logic when supporting partial ckpt? - if let Some(sstable_object_id_manager) = &self.sstable_object_id_manager { - sstable_object_id_manager.remove_watermark_object_id(TrackerId::Epoch( - self.recent_versions - .load() - .latest_version() - .visible_table_committed_epoch(), - )); - } - - debug!( - "update to hummock version: {}, epoch: {}", - new_pinned_version.id(), - new_pinned_version.visible_table_committed_epoch() - ); + debug!("update to hummock version: {}", new_pinned_version.id(),); self.uploader.update_pinned_version(new_pinned_version); } @@ -981,7 +929,6 @@ mod tests { let event_handler = HummockEventHandler::new_inner( version_update_rx, initial_version.clone(), - None, mock_sstable_store().await, Arc::new(HummockStateStoreMetrics::unused()), &default_opts_for_test(), @@ -1022,17 +969,14 @@ mod tests { ))) .unwrap(); let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap(); - assert_eq!(old_version.version(), initial_version.version()); - assert_eq!(new_version.version(), &version1); - assert_eq!( - latest_version.load().latest_version().version(), - initial_version.version() - ); + assert_eq!(*old_version, *initial_version); + assert_eq!(*new_version, version1); + assert_eq!(**latest_version.load().latest_version(), *initial_version); let mut changed = latest_version_update_tx.subscribe(); refill_finish_tx.send(()).unwrap(); changed.changed().await.unwrap(); - assert_eq!(latest_version.load().latest_version().version(), &version1); + assert_eq!(**latest_version.load().latest_version(), version1); } // test recovery with pending refill task @@ -1053,17 +997,17 @@ mod tests { .unwrap(); let (old_version2, new_version2, _refill_finish_tx2) = refill_task_rx.recv().await.unwrap(); - assert_eq!(old_version2.version(), &version1); - assert_eq!(new_version2.version(), &version2); + assert_eq!(*old_version2, version1); + assert_eq!(*new_version2, version2); let (old_version3, new_version3, _refill_finish_tx3) = refill_task_rx.recv().await.unwrap(); - assert_eq!(old_version3.version(), &version2); - assert_eq!(new_version3.version(), &version3); - assert_eq!(latest_version.load().latest_version().version(), &version1); + assert_eq!(*old_version3, version2); + assert_eq!(*new_version3, version3); + assert_eq!(**latest_version.load().latest_version(), version1); let rx = send_clear(version3.id); rx.await.unwrap(); - assert_eq!(latest_version.load().latest_version().version(), &version3); + assert_eq!(**latest_version.load().latest_version(), version3); } async fn assert_pending(fut: &mut (impl Future + Unpin)) { @@ -1090,7 +1034,7 @@ mod tests { ))) .unwrap(); rx.await.unwrap(); - assert_eq!(latest_version.load().latest_version().version(), &version5); + assert_eq!(**latest_version.load().latest_version(), version5); } } @@ -1117,7 +1061,6 @@ mod tests { let event_handler = HummockEventHandler::new_inner( version_update_rx, initial_version.clone(), - None, mock_sstable_store().await, Arc::new(HummockStateStoreMetrics::unused()), &default_opts_for_test(), diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 9331d63340b4..964c5f641520 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1143,11 +1143,6 @@ impl HummockUploader { &self.context.buffer_tracker } - #[cfg(test)] - pub(super) fn max_committed_epoch(&self) -> HummockEpoch { - self.context.pinned_version.max_committed_epoch() - } - pub(super) fn hummock_version(&self) -> &PinnedVersion { &self.context.pinned_version } @@ -1199,10 +1194,7 @@ impl HummockUploader { .or_insert_with(|| { TableUnsyncData::new( *table_id, - self.context - .pinned_version - .version() - .table_committed_epoch(*table_id), + self.context.pinned_version.table_committed_epoch(*table_id), ) }); table_data.new_epoch(epoch); @@ -1251,7 +1243,7 @@ impl HummockUploader { pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) { if let UploaderState::Working(data) = &mut self.state { // TODO: may only `ack_committed` on table whose `committed_epoch` is changed. - for (table_id, info) in pinned_version.version().state_table_info.info() { + for (table_id, info) in pinned_version.state_table_info.info() { if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) { table_data.ack_committed(info.committed_epoch); } @@ -1650,7 +1642,13 @@ pub(crate) mod tests { .new_pin_version(test_hummock_version(epoch1)) .unwrap(); uploader.update_pinned_version(new_pinned_version); - assert_eq!(epoch1, uploader.max_committed_epoch()); + assert_eq!( + epoch1, + uploader + .context + .pinned_version + .max_committed_epoch_for_test() + ); } #[tokio::test] @@ -1681,7 +1679,13 @@ pub(crate) mod tests { .unwrap(); uploader.update_pinned_version(new_pinned_version); assert!(uploader.data().syncing_data.is_empty()); - assert_eq!(epoch1, uploader.max_committed_epoch()); + assert_eq!( + epoch1, + uploader + .context + .pinned_version + .max_committed_epoch_for_test() + ); } #[tokio::test] @@ -1716,7 +1720,13 @@ pub(crate) mod tests { .unwrap(); uploader.update_pinned_version(new_pinned_version); assert!(uploader.data().syncing_data.is_empty()); - assert_eq!(epoch1, uploader.max_committed_epoch()); + assert_eq!( + epoch1, + uploader + .context + .pinned_version + .max_committed_epoch_for_test() + ); } #[tokio::test] diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 53f15ebc1fb3..36c98c581c62 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -18,9 +18,7 @@ use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; -use risingwave_pb::hummock::{ - HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask, -}; +use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; use tokio::sync::mpsc::UnboundedSender; @@ -56,10 +54,6 @@ impl HummockMetaClient for MonitoredHummockMetaClient { self.meta_client.get_current_version().await } - async fn get_snapshot(&self) -> Result { - self.meta_client.get_snapshot().await - } - async fn get_new_sst_ids(&self, number: u32) -> Result { self.stats.get_new_sst_ids_counts.inc(); let timer = self.stats.get_new_sst_ids_latency.start_timer(); diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index afaafdf7cbe8..b8aeace6c839 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::iter::empty; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -77,6 +78,14 @@ pub struct PinnedVersion { guard: Arc, } +impl Deref for PinnedVersion { + type Target = HummockVersion; + + fn deref(&self) -> &Self::Target { + &self.version + } +} + impl PinnedVersion { pub fn new( version: HummockVersion, @@ -141,20 +150,6 @@ impl PinnedVersion { None => empty(), } } - - #[cfg(any(test, feature = "test"))] - pub fn max_committed_epoch(&self) -> u64 { - self.version.max_committed_epoch() - } - - pub fn visible_table_committed_epoch(&self) -> u64 { - self.version.visible_table_committed_epoch() - } - - /// ret value can't be used as `HummockVersion`. it must be modified with delta - pub fn version(&self) -> &HummockVersion { - &self.version - } } pub(crate) async fn start_pinned_version_worker( diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs index ad711b005331..ed1233a680ea 100644 --- a/src/storage/src/hummock/local_version/recent_versions.rs +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -39,14 +39,8 @@ impl RecentVersions { fn has_table_committed(&self, new_version: &PinnedVersion) -> bool { let mut has_table_committed = false; - for (table_id, info) in new_version.version().state_table_info.info() { - if let Some(prev_info) = self - .latest_version - .version() - .state_table_info - .info() - .get(table_id) - { + for (table_id, info) in new_version.state_table_info.info() { + if let Some(prev_info) = self.latest_version.state_table_info.info().get(table_id) { match info.committed_epoch.cmp(&prev_info.committed_epoch) { Ordering::Less => { unreachable!( @@ -68,7 +62,7 @@ impl RecentVersions { #[must_use] pub fn with_new_version(&self, version: PinnedVersion) -> Self { - assert!(version.version().id > self.latest_version.version().id); + assert!(version.id > self.latest_version.id); let is_committed = self.has_table_committed(&version); let recent_versions = if self.is_latest_committed { let prev_recent_versions = if self.recent_versions.len() >= self.max_version_num { @@ -104,13 +98,7 @@ impl RecentVersions { table_id: TableId, epoch: HummockEpoch, ) -> Option { - if let Some(info) = self - .latest_version - .version() - .state_table_info - .info() - .get(&table_id) - { + if let Some(info) = self.latest_version.state_table_info.info().get(&table_id) { if info.committed_epoch <= epoch { Some(self.latest_version.clone()) } else { @@ -131,7 +119,6 @@ impl RecentVersions { epoch < self .latest_version - .version() .state_table_info .info() .get(&table_id) @@ -140,7 +127,7 @@ impl RecentVersions { ); } let result = self.recent_versions.binary_search_by(|version| { - let committed_epoch = version.version().table_committed_epoch(table_id); + let committed_epoch = version.table_committed_epoch(table_id); if let Some(committed_epoch) = committed_epoch { committed_epoch.cmp(&epoch) } else { @@ -164,12 +151,7 @@ impl RecentVersions { self.recent_versions.get(index - 1).cloned() }; version.and_then(|version| { - if version - .version() - .state_table_info - .info() - .contains_key(&table_id) - { + if version.state_table_info.info().contains_key(&table_id) { Some(version) } else { // if the table does not exist in the version, return `None` to try get a time travel version diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 8c5410f5c4cd..bdf81b70c027 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -29,7 +29,6 @@ pub mod sstable; pub use sstable::*; pub mod compactor; -pub mod conflict_detector; mod error; pub mod hummock_meta_client; pub mod iterator; diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f665bf3edc15..65258a7ff495 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -125,11 +125,10 @@ pub fn get_committed_read_version_tuple( mut key_range: TableKeyRange, epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { - if let Some(table_watermarks) = version.version().table_watermarks.get(&table_id) { + if let Some(table_watermarks) = version.table_watermarks.get(&table_id) { TableWatermarksIndex::new_committed( table_watermarks.clone(), version - .version() .state_table_info .info() .get(&table_id) @@ -396,11 +395,7 @@ impl HummockStorage { key_range: TableKeyRange, ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { let pinned_version = self.recent_versions.load().latest_version().clone(); - let info = pinned_version - .version() - .state_table_info - .info() - .get(&table_id); + let info = pinned_version.state_table_info.info().get(&table_id); // check epoch if lower mce let ret = if let Some(info) = info diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 23631ba2d81e..33bc9fc49f75 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -249,22 +249,19 @@ impl HummockReadVersion { Self { table_id, instance_id, - table_watermarks: committed_version - .version() - .table_watermarks - .get(&table_id) - .map(|table_watermarks| { + table_watermarks: committed_version.table_watermarks.get(&table_id).map( + |table_watermarks| { TableWatermarksIndex::new_committed( table_watermarks.clone(), committed_version - .version() .state_table_info .info() .get(&table_id) .expect("should exist") .committed_epoch, ) - }), + }, + ), staging: StagingVersion { imm: VecDeque::default(), sst: VecDeque::default(), @@ -364,7 +361,6 @@ impl HummockReadVersion { VersionUpdate::CommittedSnapshot(committed_version) => { if let Some(info) = committed_version - .version() .state_table_info .info() .get(&self.table_id) @@ -388,11 +384,8 @@ impl HummockReadVersion { sst.epochs.last().expect("epochs not empty") > &committed_epoch })); - if let Some(committed_watermarks) = self - .committed - .version() - .table_watermarks - .get(&self.table_id) + if let Some(committed_watermarks) = + self.committed.table_watermarks.get(&self.table_id) { if let Some(watermark_index) = &mut self.table_watermarks { watermark_index.apply_committed_watermarks( @@ -426,9 +419,7 @@ impl HummockReadVersion { direction, epoch, vnode_watermarks, - self.committed - .version() - .table_committed_epoch(self.table_id), + self.committed.table_committed_epoch(self.table_id), )); } } @@ -965,13 +956,12 @@ impl HummockVersionReader { key_range: TableKeyRange, options: ReadLogOptions, ) -> HummockResult { - let change_log = - if let Some(change_log) = version.version().table_change_log.get(&options.table_id) { - change_log.filter_epoch(epoch_range) - } else { - static EMPTY_VEC: Vec = Vec::new(); - &EMPTY_VEC[..] - }; + let change_log = if let Some(change_log) = version.table_change_log.get(&options.table_id) { + change_log.filter_epoch(epoch_range) + } else { + static EMPTY_VEC: Vec = Vec::new(); + &EMPTY_VEC[..] + }; if let Some(max_epoch_change_log) = change_log.last() { let (_, max_epoch) = epoch_range; if !max_epoch_change_log.epochs.contains(&max_epoch) { diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 235edc884ae5..37e4cb1c7b7f 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -585,7 +585,6 @@ pub(crate) async fn wait_for_epoch( // avoid unnecessary check in the loop if the value does not change let committed_epoch = receiver .borrow_and_update() - .version() .table_committed_epoch(options.table_id); if let Some(committed_epoch) = committed_epoch && committed_epoch >= wait_epoch @@ -621,10 +620,7 @@ pub(crate) async fn wait_for_epoch( } Ok(Ok(_)) => { // TODO: should handle the corner case of drop table - let new_committed_epoch = receiver - .borrow() - .version() - .table_committed_epoch(options.table_id); + let new_committed_epoch = receiver.borrow().table_committed_epoch(options.table_id); if let Some(committed_epoch) = new_committed_epoch && committed_epoch >= wait_epoch { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 578197bee209..0dfdc1aab466 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -507,8 +507,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -617,8 +616,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -810,8 +808,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -1042,8 +1039,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -1240,8 +1236,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -1380,8 +1375,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage @@ -1720,8 +1714,7 @@ mod tests { let epoch1 = test_env .storage .get_pinned_version() - .version() - .max_committed_epoch() + .max_committed_epoch_for_test() .next_epoch(); test_env .storage diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 4d0e82661fad..462f78233f25 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -750,13 +750,6 @@ impl PartialGraphManagedBarrierState { epoch = prev_epoch, "ignore sealing data for the first barrier" ); - if let Some(hummock) = self.state_store.as_hummock() { - let mce = hummock.get_pinned_version().visible_table_committed_epoch(); - assert_eq!( - mce, prev_epoch, - "first epoch should match with the current version", - ); - } tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); None } diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index b04c39769e90..593e5513bdc3 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -374,12 +374,16 @@ async fn start_replay( for delta in version_delta_logs { let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?; - let (version_id, max_committed_epoch) = - (current_version.id, current_version.max_committed_epoch()); + let (version_id, committed_epoch) = ( + current_version.id, + current_version + .table_committed_epoch(table_to_check.into()) + .unwrap_or_default(), + ); tracing::info!( - "Replayed version delta version_id: {}, max_committed_epoch: {}, compaction_groups: {:?}", + "Replayed version delta version_id: {}, committed_epoch: {}, compaction_groups: {:?}", version_id, - max_committed_epoch, + committed_epoch, compaction_groups ); @@ -389,7 +393,7 @@ async fn start_replay( .await; replay_count += 1; - replayed_epochs.push(max_committed_epoch); + replayed_epochs.push(committed_epoch); compaction_groups .into_iter() .map(|c| modified_compaction_groups.insert(c)) @@ -408,7 +412,7 @@ async fn start_replay( // pop the latest epoch replayed_epochs.pop(); - let mut epochs = vec![max_committed_epoch]; + let mut epochs = vec![committed_epoch]; epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter()); tracing::info!("===== Prepare to check snapshots: {:?}", epochs); @@ -417,7 +421,7 @@ async fn start_replay( tracing::info!( "Trigger compaction for version {}, epoch {} compaction_groups: {:?}", version_id, - max_committed_epoch, + committed_epoch, modified_compaction_groups, ); // Try trigger multiple rounds of compactions but doesn't wait for finish @@ -459,15 +463,12 @@ async fn start_replay( compaction_ok, ); - let (new_version_id, new_committed_epoch) = - (new_version.id, new_version.max_committed_epoch()); + let new_version_id = new_version.id; assert!( new_version_id >= version_id, - "new_version_id: {}, epoch: {}", + "new_version_id: {}", new_version_id, - new_committed_epoch ); - assert_eq!(max_committed_epoch, new_committed_epoch); if new_version_id != version_id { hummock.inner().update_version_and_wait(new_version).await;