From 409bc8d1b088f122ebe2285e5ef2dfff2fd12775 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 20 Sep 2024 18:44:22 +0800 Subject: [PATCH] refactor(meta): deprecate pin/unpin snapshot (#18493) --- proto/hummock.proto | 47 ---- src/ctl/src/cmd_impl/hummock/list_version.rs | 34 +-- src/ctl/src/lib.rs | 9 +- .../catalog/system_catalog/rw_catalog/mod.rs | 1 - .../rw_catalog/rw_hummock_pinned_snapshots.rs | 41 ---- src/frontend/src/meta_client.rs | 37 ---- src/frontend/src/scheduler/snapshot.rs | 202 +----------------- src/frontend/src/session.rs | 1 - src/frontend/src/test_utils.rs | 16 -- src/meta/service/src/hummock_service.rs | 64 ------ src/meta/service/src/notification_service.rs | 7 +- src/meta/src/hummock/manager/context.rs | 135 +----------- src/meta/src/hummock/manager/mod.rs | 23 +- src/meta/src/hummock/manager/tests.rs | 95 +------- src/meta/src/hummock/manager/utils.rs | 2 - src/meta/src/hummock/manager/versioning.rs | 12 +- src/meta/src/hummock/metrics_utils.rs | 19 +- .../src/hummock/mock_hummock_meta_client.rs | 26 --- src/rpc_client/src/hummock_meta_client.rs | 3 - src/rpc_client/src/meta_client.rs | 51 ----- .../hummock_test/src/compactor_tests.rs | 187 +--------------- .../src/hummock/hummock_meta_client.rs | 12 -- .../src/compaction_test_runner.rs | 11 +- src/tests/compaction_test/src/lib.rs | 1 - 24 files changed, 29 insertions(+), 1007 deletions(-) delete mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index df1a1b5a47a3..92c1494707fb 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -267,26 +267,12 @@ message UnpinVersionResponse { common.Status status = 1; } -message PinSnapshotRequest { - uint32 context_id = 1; -} - -message PinSpecificSnapshotRequest { - uint32 context_id = 1; - uint64 epoch = 2; -} - message GetAssignedCompactTaskNumRequest {} message GetAssignedCompactTaskNumResponse { uint32 num_tasks = 1; } -message PinSnapshotResponse { - common.Status status = 1; - HummockSnapshot snapshot = 2; -} - message GetEpochRequest {} message GetEpochResponse { @@ -294,23 +280,6 @@ message GetEpochResponse { HummockSnapshot snapshot = 2; } -message UnpinSnapshotRequest { - uint32 context_id = 1; -} - -message UnpinSnapshotResponse { - common.Status status = 1; -} - -message UnpinSnapshotBeforeRequest { - uint32 context_id = 1; - HummockSnapshot min_snapshot = 3; -} - -message UnpinSnapshotBeforeResponse { - common.Status status = 1; -} - // When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true, // it represents [left, right), of which right is exclusive. message KeyRange { @@ -641,23 +610,12 @@ message PinnedVersionsSummary { map workers = 2; } -message PinnedSnapshotsSummary { - repeated HummockPinnedSnapshot pinned_snapshots = 1; - map workers = 2; -} - message RiseCtlGetPinnedVersionsSummaryRequest {} message RiseCtlGetPinnedVersionsSummaryResponse { PinnedVersionsSummary summary = 1; } -message RiseCtlGetPinnedSnapshotsSummaryRequest {} - -message RiseCtlGetPinnedSnapshotsSummaryResponse { - PinnedSnapshotsSummary summary = 1; -} - message InitMetadataForReplayRequest { repeated catalog.Table tables = 1; repeated CompactionGroupInfo compaction_groups = 2; @@ -853,18 +811,13 @@ service HummockManagerService { rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse); rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse); rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse); - rpc PinSnapshot(PinSnapshotRequest) returns (PinSnapshotResponse); - rpc PinSpecificSnapshot(PinSpecificSnapshotRequest) returns (PinSnapshotResponse); rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse); - rpc UnpinSnapshot(UnpinSnapshotRequest) returns (UnpinSnapshotResponse); - rpc UnpinSnapshotBefore(UnpinSnapshotBeforeRequest) returns (UnpinSnapshotBeforeResponse); rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse); rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse); rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse); rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse); rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse); rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse); - rpc RiseCtlGetPinnedSnapshotsSummary(RiseCtlGetPinnedSnapshotsSummaryRequest) returns (RiseCtlGetPinnedSnapshotsSummaryResponse); rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse); rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse); rpc RiseCtlPauseVersionCheckpoint(RiseCtlPauseVersionCheckpointRequest) returns (RiseCtlPauseVersionCheckpointResponse); diff --git a/src/ctl/src/cmd_impl/hummock/list_version.rs b/src/ctl/src/cmd_impl/hummock/list_version.rs index 460dd88621eb..ac8ba6982316 100644 --- a/src/ctl/src/cmd_impl/hummock/list_version.rs +++ b/src/ctl/src/cmd_impl/hummock/list_version.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::hummock::{PinnedSnapshotsSummary, PinnedVersionsSummary}; +use risingwave_pb::hummock::PinnedVersionsSummary; use risingwave_rpc_client::HummockMetaClient; use crate::CtlContext; @@ -118,38 +118,6 @@ pub async fn list_pinned_versions(context: &CtlContext) -> anyhow::Result<()> { Ok(()) } -pub async fn list_pinned_snapshots(context: &CtlContext) -> anyhow::Result<()> { - let meta_client = context.meta_client().await?; - let PinnedSnapshotsSummary { - mut pinned_snapshots, - workers, - } = meta_client - .risectl_get_pinned_snapshots_summary() - .await? - .summary - .unwrap(); - pinned_snapshots.sort_by_key(|s| s.minimal_pinned_snapshot); - for pinned_snapshot in pinned_snapshots { - match workers.get(&pinned_snapshot.context_id) { - None => { - println!( - "Worker {} may have been dropped, min_pinned_snapshot {}", - pinned_snapshot.context_id, pinned_snapshot.minimal_pinned_snapshot - ); - } - Some(worker) => { - println!( - "Worker {} type {} min_pinned_snapshot {}", - pinned_snapshot.context_id, - worker.r#type().as_str_name(), - pinned_snapshot.minimal_pinned_snapshot - ); - } - } - } - Ok(()) -} - pub async fn rebuild_table_stats(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; meta_client.risectl_rebuild_table_stats().await?; diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index b35b8d1e42cb..dead46ba2b2b 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -27,9 +27,7 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::Compressi use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use thiserror_ext::AsReport; -use crate::cmd_impl::hummock::{ - build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, -}; +use crate::cmd_impl::hummock::{build_compaction_config_vec, list_pinned_versions}; use crate::cmd_impl::meta::EtcdBackend; use crate::cmd_impl::throttle::apply_throttle; use crate::common::CtlContext; @@ -223,8 +221,6 @@ enum HummockCommands { }, /// List pinned versions of each worker. ListPinnedVersions {}, - /// List pinned snapshots of each worker. - ListPinnedSnapshots {}, /// List all compaction groups. ListCompactionGroup, /// Update compaction config for compaction groups. @@ -654,9 +650,6 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::ListPinnedVersions {}) => { list_pinned_versions(context).await? } - Commands::Hummock(HummockCommands::ListPinnedSnapshots {}) => { - list_pinned_snapshots(context).await? - } Commands::Hummock(HummockCommands::ListCompactionGroup) => { cmd_impl::hummock::list_compaction_group(context).await? } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 5e3261c06d18..75825a74320c 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -29,7 +29,6 @@ mod rw_hummock_compact_task_assignment; mod rw_hummock_compact_task_progress; mod rw_hummock_compaction_group_configs; mod rw_hummock_meta_configs; -mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; mod rw_hummock_version; mod rw_hummock_version_deltas; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs deleted file mode 100644 index e4f18c8fecaf..000000000000 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_pinned_snapshots.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use risingwave_common::types::Fields; -use risingwave_frontend_macro::system_catalog; - -use crate::catalog::system_catalog::SysCatalogReaderImpl; -use crate::error::Result; - -#[derive(Fields)] -struct RwHummockPinnedSnapshot { - #[primary_key] - worker_node_id: i32, - min_pinned_snapshot_id: i64, -} - -#[system_catalog(table, "rw_catalog.rw_hummock_pinned_snapshots")] -async fn read(reader: &SysCatalogReaderImpl) -> Result> { - let pinned_snapshots = reader - .meta_client - .list_hummock_pinned_snapshots() - .await? - .into_iter() - .map(|s| RwHummockPinnedSnapshot { - worker_node_id: s.0 as _, - min_pinned_snapshot_id: s.1 as _, - }) - .collect(); - Ok(pinned_snapshots) -} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index c58dcc365f43..97395dcd786b 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -47,9 +47,6 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient}; #[async_trait::async_trait] pub trait FrontendMetaClient: Send + Sync { async fn try_unregister(&self); - - async fn pin_snapshot(&self) -> Result; - async fn get_snapshot(&self) -> Result; async fn flush(&self, checkpoint: bool) -> Result; @@ -73,10 +70,6 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_object_dependencies(&self) -> Result>; - async fn unpin_snapshot(&self) -> Result<()>; - - async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>; - async fn list_meta_snapshots(&self) -> Result>; async fn get_system_params(&self) -> Result; @@ -98,9 +91,6 @@ pub trait FrontendMetaClient: Send + Sync { /// Returns vector of (worker_id, min_pinned_version_id) async fn list_hummock_pinned_versions(&self) -> Result>; - /// Returns vector of (worker_id, min_pinned_snapshot_id) - async fn list_hummock_pinned_snapshots(&self) -> Result>; - async fn get_hummock_current_version(&self) -> Result; async fn get_hummock_checkpoint_version(&self) -> Result; @@ -149,10 +139,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.try_unregister().await; } - async fn pin_snapshot(&self) -> Result { - self.0.pin_snapshot().await - } - async fn get_snapshot(&self) -> Result { self.0.get_snapshot().await } @@ -196,14 +182,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.list_object_dependencies().await } - async fn unpin_snapshot(&self) -> Result<()> { - self.0.unpin_snapshot().await - } - - async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()> { - self.0.unpin_snapshot_before(epoch).await - } - async fn list_meta_snapshots(&self) -> Result> { let manifest = self.0.get_meta_snapshot_manifest().await?; Ok(manifest.snapshot_metadata) @@ -257,21 +235,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { Ok(ret) } - async fn list_hummock_pinned_snapshots(&self) -> Result> { - let pinned_snapshots = self - .0 - .risectl_get_pinned_snapshots_summary() - .await? - .summary - .unwrap() - .pinned_snapshots; - let ret = pinned_snapshots - .into_iter() - .map(|s| (s.context_id, s.minimal_pinned_snapshot)) - .collect(); - Ok(ret) - } - async fn get_hummock_current_version(&self) -> Result { self.0.get_current_version().await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 5d1ad6d69d0b..b0d8918cfee4 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -12,30 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; -use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; -use risingwave_common::must_match; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ - FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, + FrontendHummockVersion, FrontendHummockVersionDelta, INVALID_VERSION_ID, }; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; -use thiserror_ext::AsReport; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; -/// The interval between two unpin batches. -const UNPIN_INTERVAL_SECS: u64 = 10; - /// The storage snapshot to read from in a query, which can be freely cloned. #[derive(Clone)] pub enum ReadSnapshot { @@ -86,12 +77,10 @@ impl ReadSnapshot { } } -/// A frontend-pinned snapshot that notifies the [`UnpinWorker`] when it's dropped. // DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a // singleton for each snapshot. Use `PinnedSnapshotRef` instead. pub struct PinnedSnapshot { value: FrontendHummockVersion, - unpin_sender: UnboundedSender, } impl std::fmt::Debug for PinnedSnapshot { @@ -118,12 +107,6 @@ impl PinnedSnapshot { } } -impl Drop for PinnedSnapshot { - fn drop(&mut self) { - let _ = self.unpin_sender.send(Operation::Unpin(self.value.id)); - } -} - /// Returns an invalid snapshot, used for initial values. fn invalid_snapshot() -> FrontendHummockVersion { FrontendHummockVersion { @@ -136,10 +119,6 @@ fn invalid_snapshot() -> FrontendHummockVersion { /// Cache of hummock snapshot in meta. pub struct HummockSnapshotManager { - /// Send epoch-related operations to [`UnpinWorker`] for managing the pinned snapshots and - /// unpin them in a batch through RPC. - worker_sender: UnboundedSender, - /// The latest snapshot synced from the meta service. /// /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc @@ -155,22 +134,14 @@ pub struct HummockSnapshotManager { pub type HummockSnapshotManagerRef = Arc; impl HummockSnapshotManager { - pub fn new(meta_client: Arc) -> Self { - let (worker_sender, worker_receiver) = tokio::sync::mpsc::unbounded_channel(); - - tokio::spawn(UnpinWorker::new(meta_client, worker_receiver).run()); - + pub fn new(_meta_client: Arc) -> Self { let latest_snapshot = Arc::new(PinnedSnapshot { value: invalid_snapshot(), - unpin_sender: worker_sender.clone(), }); let (latest_snapshot, _) = watch::channel(latest_snapshot); - Self { - worker_sender, - latest_snapshot, - } + Self { latest_snapshot } } /// Acquire the latest snapshot by increasing its reference count. @@ -214,15 +185,7 @@ impl HummockSnapshotManager { ); return false; } - // First tell the worker that a new snapshot is going to be pinned. - self.worker_sender - .send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch)) - .unwrap(); - // Then set the latest snapshot. - *old_snapshot = Arc::new(PinnedSnapshot { - value: snapshot, - unpin_sender: self.worker_sender.clone(), - }); + *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot }); true }); @@ -236,158 +199,3 @@ impl HummockSnapshotManager { } } } - -/// The pin state of a snapshot. -#[derive(Debug)] -enum PinState { - /// The snapshot is currently pinned by some sessions in this frontend. - Pinned(u64), - - /// The snapshot is no longer pinned by any session in this frontend, but it's still considered - /// to be pinned by the meta service. It will be unpinned by the [`UnpinWorker`] in the next - /// unpin batch through RPC, and the entry will be removed then. - Unpinned, -} - -/// The operation handled by the [`UnpinWorker`]. -#[derive(Debug)] -enum Operation { - /// Mark the snapshot as pinned, sent when a new snapshot is pinned with `update`. - Pin(HummockVersionId, u64), - - /// Mark the snapshot as unpinned, sent when all references to a [`PinnedSnapshot`] is dropped. - Unpin(HummockVersionId), -} - -impl Operation { - /// Returns whether the operation is for an invalid snapshot, which should be ignored. - fn is_invalid(&self) -> bool { - *match self { - Operation::Pin(id, _) | Operation::Unpin(id) => id, - } == INVALID_VERSION_ID - } -} - -/// The key for the states map in [`UnpinWorker`]. -/// -/// The snapshot will be first sorted by `committed_epoch`, then by `current_epoch`. -#[derive(Debug, PartialEq, Clone)] -struct SnapshotKey(HummockVersionId); - -impl Eq for SnapshotKey {} - -impl Ord for SnapshotKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.0.to_u64().cmp(&other.0.to_u64()) - } -} - -impl PartialOrd for SnapshotKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// The worker that manages the pin states of snapshots and unpins them periodically in a batch -/// through RPC to the meta service. -struct UnpinWorker { - meta_client: Arc, - - /// The receiver of operations from snapshot updating in [`HummockSnapshotManager`] and - /// dropping of [`PinnedSnapshot`]. - receiver: UnboundedReceiver, - - /// The pin states of existing snapshots in this frontend. - /// - /// All snapshots in this map are considered to be pinned by the meta service, those with - /// [`PinState::Unpinned`] will be unpinned in the next unpin batch through RPC. - states: BTreeMap, -} - -impl UnpinWorker { - fn new( - meta_client: Arc, - receiver: UnboundedReceiver, - ) -> Self { - Self { - meta_client, - receiver, - states: Default::default(), - } - } - - /// Run the loop of handling operations and unpinning snapshots. - async fn run(mut self) { - let mut ticker = tokio::time::interval(Duration::from_secs(UNPIN_INTERVAL_SECS)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - - loop { - tokio::select! { - operation = self.receiver.recv() => { - let Some(operation) = operation else { return }; // manager dropped - self.handle_operation(operation); - } - - _ = ticker.tick() => { - self.unpin_batch().await; - } - } - } - } - - /// Handle an operation and manipulate the states. - fn handle_operation(&mut self, operation: Operation) { - if operation.is_invalid() { - return; - } - - match operation { - Operation::Pin(version_id, committed_epoch) => { - self.states - .try_insert(SnapshotKey(version_id), PinState::Pinned(committed_epoch)) - .unwrap(); - } - Operation::Unpin(snapshot) => match self.states.entry(SnapshotKey(snapshot)) { - Entry::Vacant(_v) => unreachable!("unpin a snapshot that is not pinned"), - Entry::Occupied(o) => { - assert_matches!(o.get(), PinState::Pinned(_)); - *o.into_mut() = PinState::Unpinned; - } - }, - } - } - - /// Try to unpin all continuous snapshots with [`PinState::Unpinned`] in a batch through RPC, - /// and clean up their entries. - async fn unpin_batch(&mut self) { - // Find the minimum snapshot that is pinned. Unpin all snapshots before it. - if let Some((min_snapshot, min_committed_epoch)) = self - .states - .iter() - .find(|(_, s)| matches!(s, PinState::Pinned(_))) - .map(|(k, s)| { - ( - k.clone(), - must_match!(s, PinState::Pinned(committed_epoch) => *committed_epoch), - ) - }) - { - if &min_snapshot == self.states.first_key_value().unwrap().0 { - // Nothing to unpin. - return; - } - - let min_epoch = min_committed_epoch; - - match self.meta_client.unpin_snapshot_before(min_epoch).await { - Ok(()) => { - // Remove all snapshots before this one. - self.states = self.states.split_off(&min_snapshot); - } - Err(e) => { - tracing::error!(error = %e.as_report(), min_epoch, "unpin snapshot failed") - } - } - } - } -} diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 21a70d385bda..21c3e9a950b3 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1604,7 +1604,6 @@ impl Session for SessionImpl { self.elapse_since_last_idle_instant() { if elapse_since_last_idle_instant > idle_in_transaction_session_timeout { - self.unpin_snapshot(); return Err(PsqlError::IdleInTxnTimeout); } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 7826890c84d1..64623cefe926 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -929,10 +929,6 @@ pub struct MockFrontendMetaClient {} impl FrontendMetaClient for MockFrontendMetaClient { async fn try_unregister(&self) {} - async fn pin_snapshot(&self) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) - } - async fn get_snapshot(&self) -> RpcResult { Ok(HummockSnapshot { committed_epoch: 0 }) } @@ -972,14 +968,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(vec![]) } - async fn unpin_snapshot(&self) -> RpcResult<()> { - Ok(()) - } - - async fn unpin_snapshot_before(&self, _epoch: u64) -> RpcResult<()> { - Ok(()) - } - async fn list_meta_snapshots(&self) -> RpcResult> { Ok(vec![]) } @@ -1016,10 +1004,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { unimplemented!() } - async fn list_hummock_pinned_snapshots(&self) -> RpcResult> { - unimplemented!() - } - async fn get_hummock_current_version(&self) -> RpcResult { Ok(HummockVersion::default()) } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index c74abb657140..5830951d3e5f 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -163,53 +163,6 @@ impl HummockManagerService for HummockServiceImpl { Ok(Response::new(resp)) } - async fn pin_specific_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let hummock_snapshot = self - .hummock_manager - .pin_specific_snapshot(req.context_id, req.epoch) - .await?; - Ok(Response::new(PinSnapshotResponse { - status: None, - snapshot: Some(hummock_snapshot), - })) - } - - async fn pin_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let hummock_snapshot = self.hummock_manager.pin_snapshot(req.context_id).await?; - Ok(Response::new(PinSnapshotResponse { - status: None, - snapshot: Some(hummock_snapshot), - })) - } - - async fn unpin_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - self.hummock_manager.unpin_snapshot(req.context_id).await?; - Ok(Response::new(UnpinSnapshotResponse { status: None })) - } - - async fn unpin_snapshot_before( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - self.hummock_manager - .unpin_snapshot_before(req.context_id, req.min_snapshot.unwrap()) - .await?; - Ok(Response::new(UnpinSnapshotBeforeResponse { status: None })) - } - async fn get_new_sst_ids( &self, request: Request, @@ -363,23 +316,6 @@ impl HummockManagerService for HummockServiceImpl { })) } - async fn rise_ctl_get_pinned_snapshots_summary( - &self, - _request: Request, - ) -> Result, Status> { - let pinned_snapshots = self.hummock_manager.list_pinned_snapshot().await; - let workers = self - .hummock_manager - .list_workers(&pinned_snapshots.iter().map(|p| p.context_id).collect_vec()) - .await?; - Ok(Response::new(RiseCtlGetPinnedSnapshotsSummaryResponse { - summary: Some(PinnedSnapshotsSummary { - pinned_snapshots, - workers, - }), - })) - } - async fn get_assigned_compact_task_num( &self, _request: Request, diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index 6e993f2067f0..53501e76221a 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -412,12 +412,7 @@ impl NotificationService for NotificationServiceImpl { let meta_snapshot = match subscribe_type { SubscribeType::Compactor => self.compactor_subscribe().await?, - SubscribeType::Frontend => { - self.hummock_manager - .pin_snapshot(req.get_worker_id()) - .await?; - self.frontend_subscribe().await? - } + SubscribeType::Frontend => self.frontend_subscribe().await?, SubscribeType::Hummock => { self.hummock_manager .pin_version(req.get_worker_id()) diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 1ff31013c2ee..8f1a510dc07e 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -17,22 +17,17 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use fail::fail_point; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, INVALID_VERSION_ID, }; -use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, ValidationTask, -}; +use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; use crate::hummock::manager::{commit_multi_var, start_measure_real_process_timer}; -use crate::hummock::metrics_utils::{ - trigger_pin_unpin_snapshot_state, trigger_pin_unpin_version_state, -}; +use crate::hummock::metrics_utils::trigger_pin_unpin_version_state; use crate::hummock::HummockManager; use crate::manager::{MetaStoreImpl, MetadataManager, META_NODE_ID}; use crate::model::BTreeMapTransaction; @@ -60,7 +55,6 @@ impl Drop for HummockVersionSafePoint { #[derive(Default)] pub(super) struct ContextInfo { pub pinned_versions: BTreeMap, - pub pinned_snapshots: BTreeMap, /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state. pub version_safe_points: Vec, } @@ -82,12 +76,10 @@ impl ContextInfo { ))); let mut pinned_versions = BTreeMapTransaction::new(&mut self.pinned_versions); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut self.pinned_snapshots); for context_id in context_ids.as_ref() { pinned_versions.remove(*context_id); - pinned_snapshots.remove(*context_id); } - commit_multi_var!(meta_store_ref, pinned_versions, pinned_snapshots)?; + commit_multi_var!(meta_store_ref, pinned_versions)?; Ok(()) } @@ -174,7 +166,6 @@ impl HummockManager { .map(|c| c.context_id), ); active_context_ids.extend(context_info.pinned_versions.keys()); - active_context_ids.extend(context_info.pinned_snapshots.keys()); (active_context_ids, context_info) }; @@ -364,126 +355,6 @@ impl HummockManager { Ok(()) } - - pub async fn pin_specific_snapshot( - &self, - context_id: HummockContextId, - epoch: HummockEpoch, - ) -> Result { - let snapshot = self.latest_snapshot.load(); - let mut guard = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &guard) - .await?; - let mut pinned_snapshots = BTreeMapTransaction::new(&mut guard.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - let epoch_to_pin = std::cmp::min(epoch, snapshot.committed_epoch); - if context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH { - context_pinned_snapshot.minimal_pinned_snapshot = epoch_to_pin; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - } - Ok(HummockSnapshot::clone(&snapshot)) - } - - /// Make sure `max_committed_epoch` is pinned and return it. - pub async fn pin_snapshot(&self, context_id: HummockContextId) -> Result { - let snapshot = self.latest_snapshot.load(); - let mut guard = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &guard) - .await?; - let _timer = start_measure_real_process_timer!(self, "pin_snapshot"); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut guard.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - if context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH { - context_pinned_snapshot.minimal_pinned_snapshot = snapshot.committed_epoch; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &guard.pinned_snapshots); - } - Ok(HummockSnapshot::clone(&snapshot)) - } - - pub async fn unpin_snapshot(&self, context_id: HummockContextId) -> Result<()> { - let mut context_info = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &context_info) - .await?; - let _timer = start_measure_real_process_timer!(self, "unpin_snapshot"); - let mut pinned_snapshots = BTreeMapTransaction::new(&mut context_info.pinned_snapshots); - let release_snapshot = pinned_snapshots.remove(context_id); - if release_snapshot.is_some() { - commit_multi_var!(self.meta_store_ref(), pinned_snapshots)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); - } - - #[cfg(test)] - { - drop(context_info); - self.check_state_consistency().await; - } - - Ok(()) - } - - /// Unpin all snapshots smaller than specified epoch for current context. - pub async fn unpin_snapshot_before( - &self, - context_id: HummockContextId, - hummock_snapshot: HummockSnapshot, - ) -> Result<()> { - let versioning = self.versioning.read().await; - let mut context_info = self.context_info.write().await; - self.check_context_with_meta_node(context_id, &context_info) - .await?; - let _timer = start_measure_real_process_timer!(self, "unpin_snapshot_before"); - // Use the max_committed_epoch in storage as the snapshot ts so only committed changes are - // visible in the snapshot. - let max_committed_epoch = versioning.current_version.visible_table_committed_epoch(); - // Ensure the unpin will not clean the latest one. - let snapshot_committed_epoch = hummock_snapshot.committed_epoch; - #[cfg(not(test))] - { - assert!(snapshot_committed_epoch <= max_committed_epoch); - } - let last_read_epoch = std::cmp::min(snapshot_committed_epoch, max_committed_epoch); - - let mut pinned_snapshots = BTreeMapTransaction::new(&mut context_info.pinned_snapshots); - let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( - context_id, - HummockPinnedSnapshot { - context_id, - minimal_pinned_snapshot: INVALID_EPOCH, - }, - ); - - // Unpin the snapshots pinned by meta but frontend doesn't know. Also equal to unpin all - // epochs below specific watermark. - if context_pinned_snapshot.minimal_pinned_snapshot < last_read_epoch - || context_pinned_snapshot.minimal_pinned_snapshot == INVALID_EPOCH - { - context_pinned_snapshot.minimal_pinned_snapshot = last_read_epoch; - commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); - } - - #[cfg(test)] - { - drop(context_info); - drop(versioning); - self.check_state_consistency().await; - } - - Ok(()) - } } // safe point diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index d43b1cc6f542..4cf29ca060e1 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -29,12 +29,12 @@ use risingwave_hummock_sdk::{ HummockContextId, HummockVersionId, }; use risingwave_meta_model_v2::{ - compaction_status, compaction_task, hummock_pinned_snapshot, hummock_pinned_version, - hummock_version_delta, hummock_version_stats, + compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta, + hummock_version_stats, }; use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, - PbCompactTaskAssignment, PbCompactionGroupInfo, SubscribeCompactionEventRequest, + HummockPinnedVersion, HummockSnapshot, HummockVersionStats, PbCompactTaskAssignment, + PbCompactionGroupInfo, SubscribeCompactionEventRequest, }; use risingwave_pb::meta::subscribe_response::Operation; use tokio::sync::mpsc::UnboundedSender; @@ -451,21 +451,6 @@ impl HummockManager { .collect(), }; - context_info.pinned_snapshots = match &meta_store { - MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store) - .await? - .into_iter() - .map(|p| (p.context_id, p)) - .collect(), - MetaStoreImpl::Sql(sql_meta_store) => hummock_pinned_snapshot::Entity::find() - .all(&sql_meta_store.conn) - .await - .map_err(MetadataModelError::from)? - .into_iter() - .map(|m| (m.context_id as HummockContextId, m.into())) - .collect(), - }; - self.delete_object_tracker.clear(); // Not delete stale objects when archive or time travel is enabled if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 09d43bf5fc72..9fa9b11a026c 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -13,6 +13,7 @@ // limitations under the License. #![cfg(test)] + use std::cmp::Ordering; use std::collections::HashMap; use std::sync::Arc; @@ -37,7 +38,7 @@ use risingwave_hummock_sdk::{ }; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; -use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot}; +use risingwave_pb::hummock::HummockPinnedVersion; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -55,13 +56,6 @@ fn pin_versions_sum(pin_versions: &[HummockPinnedVersion]) -> usize { pin_versions.iter().len() } -fn pin_snapshots_epoch(pin_snapshots: &[HummockPinnedSnapshot]) -> Vec { - pin_snapshots - .iter() - .map(|p| p.minimal_pinned_snapshot) - .collect_vec() -} - fn gen_sstable_info(sst_id: u64, table_ids: Vec, epoch: u64) -> SstableInfo { let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1"); let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1"); @@ -106,23 +100,6 @@ fn get_compaction_group_object_ids( .collect_vec() } -async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec { - match env.meta_store_ref() { - MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store).await.unwrap(), - MetaStoreImpl::Sql(sql_meta_store) => { - use risingwave_meta_model_v2::hummock_pinned_snapshot; - use sea_orm::EntityTrait; - hummock_pinned_snapshot::Entity::find() - .all(&sql_meta_store.conn) - .await - .unwrap() - .into_iter() - .map(Into::into) - .collect() - } - } -} - async fn list_pinned_version_from_meta_store(env: &MetaSrvEnv) -> Vec { match env.meta_store_ref() { MetaStoreImpl::Kv(meta_store) => HummockPinnedVersion::list(meta_store).await.unwrap(), @@ -140,59 +117,6 @@ async fn list_pinned_version_from_meta_store(env: &MetaSrvEnv) -> Vec [ e0:pinned ] - let mut epoch_recorded_in_frontend = hummock_manager.pin_snapshot(context_id).await.unwrap(); + let mut epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); let prev_epoch = epoch.prev_epoch(); assert_eq!(epoch_recorded_in_frontend.committed_epoch, prev_epoch); @@ -729,13 +648,13 @@ async fn test_pin_snapshot_response_lost() { // Assume the response of the previous rpc is lost. // [ e0:pinned, e1 ] -> [ e0, e1:pinned ] - epoch_recorded_in_frontend = hummock_manager.pin_snapshot(context_id).await.unwrap(); + epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); let prev_epoch = epoch.prev_epoch(); assert_eq!(epoch_recorded_in_frontend.committed_epoch, prev_epoch); // Assume the response of the previous rpc is lost. // [ e0, e1:pinned ] -> [ e0, e1:pinned ] - epoch_recorded_in_frontend = hummock_manager.pin_snapshot(context_id).await.unwrap(); + epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); assert_eq!( epoch_recorded_in_frontend.committed_epoch, epoch.prev_epoch() @@ -764,7 +683,7 @@ async fn test_pin_snapshot_response_lost() { // Use correct snapshot id. // [ e0, e1:pinned, e2 ] -> [ e0, e1:pinned, e2:pinned ] - epoch_recorded_in_frontend = hummock_manager.pin_snapshot(context_id).await.unwrap(); + epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); assert_eq!( epoch_recorded_in_frontend.committed_epoch, epoch.prev_epoch() @@ -793,7 +712,7 @@ async fn test_pin_snapshot_response_lost() { // Use u64::MAX as epoch to pin greatest snapshot // [ e0, e1:pinned, e2:pinned, e3 ] -> [ e0, e1:pinned, e2:pinned, e3::pinned ] - epoch_recorded_in_frontend = hummock_manager.pin_snapshot(context_id).await.unwrap(); + epoch_recorded_in_frontend = hummock_manager.latest_snapshot(); assert_eq!( epoch_recorded_in_frontend.committed_epoch, epoch.prev_epoch() diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index fd1372b3a500..2020eee1172e 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -97,14 +97,12 @@ impl HummockManager { let compact_statuses_copy = compaction_guard.compaction_statuses.clone(); let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone(); let pinned_versions_copy = context_info_guard.pinned_versions.clone(); - let pinned_snapshots_copy = context_info_guard.pinned_snapshots.clone(); let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone(); let version_stats_copy = versioning_guard.version_stats.clone(); (( compact_statuses_copy, compact_task_assignment_copy, pinned_versions_copy, - pinned_snapshots_copy, hummock_version_deltas_copy, version_stats_copy, ),) diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 3d621a1d5991..fc9fe3b28f65 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -31,7 +31,7 @@ use risingwave_hummock_sdk::{ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersionStats, TableStats, + HummockPinnedVersion, HummockSnapshot, HummockVersionStats, TableStats, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -118,16 +118,6 @@ impl HummockManager { .collect_vec() } - pub async fn list_pinned_snapshot(&self) -> Vec { - self.context_info - .read() - .await - .pinned_snapshots - .values() - .cloned() - .collect_vec() - } - pub async fn list_workers( &self, context_ids: &[HummockContextId], diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index b80fd0537659..ad39e2228fba 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -24,10 +24,10 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_m use risingwave_hummock_sdk::level::Levels; use risingwave_hummock_sdk::table_stats::PbTableStatsMap; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockEpoch, HummockVersionId}; +use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockVersionId}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - CompactionConfig, HummockPinnedSnapshot, HummockPinnedVersion, HummockVersionStats, LevelType, + CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType, }; use super::compaction::selector::DynamicLevelSelectorCore; @@ -401,21 +401,6 @@ pub fn trigger_pin_unpin_version_state( } } -pub fn trigger_pin_unpin_snapshot_state( - metrics: &MetaMetrics, - pinned_snapshots: &BTreeMap, -) { - if let Some(m) = pinned_snapshots - .values() - .map(|v| v.minimal_pinned_snapshot) - .min() - { - metrics.min_pinned_epoch.set(m as i64); - } else { - metrics.min_pinned_epoch.set(HummockEpoch::MAX as _); - } -} - pub fn trigger_gc_stat( metrics: &MetaMetrics, checkpoint: &HummockVersionCheckpoint, diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index c926e2145e88..c59fb512090f 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -119,36 +119,10 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(self.hummock_manager.get_current_version().await) } - async fn pin_snapshot(&self) -> Result { - self.hummock_manager - .pin_snapshot(self.context_id) - .await - .map_err(mock_err) - } - async fn get_snapshot(&self) -> Result { Ok(self.hummock_manager.latest_snapshot()) } - async fn unpin_snapshot(&self) -> Result<()> { - self.hummock_manager - .unpin_snapshot(self.context_id) - .await - .map_err(mock_err) - } - - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()> { - self.hummock_manager - .unpin_snapshot_before( - self.context_id, - HummockSnapshot { - committed_epoch: pinned_epochs, - }, - ) - .await - .map_err(mock_err) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { fail_point!("get_new_sst_ids_err", |_| Err(anyhow!( "failpoint get_new_sst_ids_err" diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index db99036a3475..5239d000353c 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -32,9 +32,6 @@ use crate::error::Result; pub trait HummockMetaClient: Send + Sync + 'static { async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()>; async fn get_current_version(&self) -> Result; - async fn pin_snapshot(&self) -> Result; - async fn unpin_snapshot(&self) -> Result<()>; - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()>; async fn get_snapshot(&self) -> Result; async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index a226e4d67249..f736e23cb9e0 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1011,15 +1011,6 @@ impl MetaClient { .await } - pub async fn risectl_get_pinned_snapshots_summary( - &self, - ) -> Result { - let request = RiseCtlGetPinnedSnapshotsSummaryRequest {}; - self.inner - .rise_ctl_get_pinned_snapshots_summary(request) - .await - } - pub async fn risectl_get_checkpoint_hummock_version( &self, ) -> Result { @@ -1116,15 +1107,6 @@ impl MetaClient { )) } - pub async fn pin_specific_snapshot(&self, epoch: HummockEpoch) -> Result { - let req = PinSpecificSnapshotRequest { - context_id: self.worker_id(), - epoch, - }; - let resp = self.inner.pin_specific_snapshot(req).await?; - Ok(resp.snapshot.unwrap()) - } - pub async fn get_assigned_compact_task_num(&self) -> Result { let req = GetAssignedCompactTaskNumRequest {}; let resp = self.inner.get_assigned_compact_task_num(req).await?; @@ -1470,40 +1452,12 @@ impl HummockMetaClient for MetaClient { )) } - async fn pin_snapshot(&self) -> Result { - let req = PinSnapshotRequest { - context_id: self.worker_id(), - }; - let resp = self.inner.pin_snapshot(req).await?; - Ok(resp.snapshot.unwrap()) - } - async fn get_snapshot(&self) -> Result { let req = GetEpochRequest {}; let resp = self.inner.get_epoch(req).await?; Ok(resp.snapshot.unwrap()) } - async fn unpin_snapshot(&self) -> Result<()> { - let req = UnpinSnapshotRequest { - context_id: self.worker_id(), - }; - self.inner.unpin_snapshot(req).await?; - Ok(()) - } - - async fn unpin_snapshot_before(&self, pinned_epochs: HummockEpoch) -> Result<()> { - let req = UnpinSnapshotBeforeRequest { - context_id: self.worker_id(), - // For unpin_snapshot_before, we do not care about snapshots list but only min epoch. - min_snapshot: Some(HummockSnapshot { - committed_epoch: pinned_epochs, - }), - }; - self.inner.unpin_snapshot_before(req).await?; - Ok(()) - } - async fn get_new_sst_ids(&self, number: u32) -> Result { let resp = self .inner @@ -2095,18 +2049,13 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, get_assigned_compact_task_num, GetAssignedCompactTaskNumRequest, GetAssignedCompactTaskNumResponse } ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse } ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse } - ,{ hummock_client, pin_snapshot, PinSnapshotRequest, PinSnapshotResponse } - ,{ hummock_client, pin_specific_snapshot, PinSpecificSnapshotRequest, PinSnapshotResponse } ,{ hummock_client, get_epoch, GetEpochRequest, GetEpochResponse } - ,{ hummock_client, unpin_snapshot, UnpinSnapshotRequest, UnpinSnapshotResponse } - ,{ hummock_client, unpin_snapshot_before, UnpinSnapshotBeforeRequest, UnpinSnapshotBeforeResponse } ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse } ,{ hummock_client, report_vacuum_task, ReportVacuumTaskRequest, ReportVacuumTaskResponse } ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse } ,{ hummock_client, report_full_scan_task, ReportFullScanTaskRequest, ReportFullScanTaskResponse } ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse } ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse } - ,{ hummock_client, rise_ctl_get_pinned_snapshots_summary, RiseCtlGetPinnedSnapshotsSummaryRequest, RiseCtlGetPinnedSnapshotsSummaryResponse } ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse } ,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse } ,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index d0afa17b2a36..86f578d55bb3 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -14,7 +14,6 @@ #[cfg(test)] pub(crate) mod tests { - use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; use std::ops::Bound; use std::sync::Arc; @@ -44,19 +43,15 @@ pub(crate) mod tests { }; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, CompactionGroupId}; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::selector::{ default_compaction_selector, ManualCompactionOption, }; use risingwave_meta::hummock::test_utils::{ get_compaction_group_id_by_table_id, register_table_ids_to_compaction_group, - setup_compute_env, setup_compute_env_with_config, - unregister_table_ids_from_compaction_group, + setup_compute_env, unregister_table_ids_from_compaction_group, }; use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; - use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::TableOption; - use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, FixedLengthFilterKeyExtractor, @@ -222,186 +217,6 @@ pub(crate) mod tests { } } - #[ignore] - #[tokio::test] - async fn test_compaction_watermark() { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .level0_max_compact_file_number(130) - .level0_overlapping_sub_level_compact_level_count(1) - .build(); - let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, config).await; - let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref.clone(), - worker_node.id, - )); - - // 1. add sstables - let table_id = 0; - let mut key = BytesMut::default(); - key.put_u16(0); - key.put_slice(b"same_key"); - let storage = get_hummock_storage( - hummock_meta_client.clone(), - get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), - &hummock_manager_ref, - &[table_id], - ) - .await; - let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() - { - FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - rpc_filter_key_extractor_manager, - ) => rpc_filter_key_extractor_manager, - FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), - }; - let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( - rpc_filter_key_extractor_manager, - ); - let compact_ctx = get_compactor_context(&storage); - let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( - hummock_meta_client.clone(), - storage - .storage_opts() - .clone() - .sstable_id_remote_fetch_number, - )); - let worker_node_id2 = hummock_manager_ref - .metadata_manager() - .add_worker_node( - WorkerType::ComputeNode, - HostAddress::default(), - Property::default(), - Default::default(), - ) - .await - .unwrap(); - let _snapshot = hummock_manager_ref - .pin_snapshot(worker_node_id2) - .await - .unwrap(); - let key = key.freeze(); - const SST_COUNT: u64 = 32; - const TEST_WATERMARK: u64 = 8; - prepare_test_put_data( - &storage, - &hummock_meta_client, - &key, - 1 << 10, - (1..SST_COUNT + 1) - .map(|v| test_epoch(v * 1000)) - .collect_vec(), - ) - .await; - - let compaction_group_id = - get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), table_id).await; - // 2. get compact task - while let Some(mut compact_task) = hummock_manager_ref - .get_compact_task(compaction_group_id, &mut default_compaction_selector()) - .await - .unwrap() - { - let compaction_filter_flag = CompactionFilterFlag::TTL; - // compact_task.watermark = (TEST_WATERMARK * 1000) << 16; - compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.table_options = BTreeMap::from([( - 0, - TableOption { - retention_seconds: Some(64), - }, - )]); - compact_task.current_epoch_time = 0; - - let (_tx, rx) = tokio::sync::oneshot::channel(); - let ((result_task, task_stats), _) = compact( - compact_ctx.clone(), - compact_task.clone(), - rx, - Box::new(sstable_object_id_manager.clone()), - filter_key_extractor_manager.clone(), - ) - .await; - - hummock_manager_ref - .report_compact_task_for_test( - result_task.task_id, - Some(compact_task), - result_task.task_status, - result_task.sorted_output_ssts, - Some(to_prost_table_stats_map(task_stats)), - ) - .await - .unwrap(); - } - - let mut val = b"0"[..].repeat(1 << 10); - val.extend_from_slice(&(TEST_WATERMARK * 1000).to_be_bytes()); - - let compactor_manager = hummock_manager_ref.compactor_manager_ref_for_test(); - let _recv = compactor_manager.add_compactor(worker_node.id); - - // 4. get the latest version and check - let version = hummock_manager_ref.get_current_version().await; - let group = version.get_compaction_group_levels(compaction_group_id); - - // base level - let output_tables = group - .levels - .iter() - .flat_map(|level| level.table_infos.clone()) - .chain( - group - .l0 - .sub_levels - .iter() - .flat_map(|level| level.table_infos.clone()), - ) - .collect_vec(); - - storage.wait_version(version).await; - let mut table_key_count = 0; - for output_sst in output_tables { - let table = storage - .sstable_store() - .sstable(&output_sst, &mut StoreLocalStatistic::default()) - .await - .unwrap(); - table_key_count += table.meta.key_count; - } - - // we have removed these 31 keys before watermark 32. - assert_eq!(table_key_count, (SST_COUNT - TEST_WATERMARK + 1) as u32); - let read_epoch = (TEST_WATERMARK * 1000) << 16; - - let get_ret = storage - .get( - TableKey(key.clone()), - read_epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }, - ) - .await; - let get_val = get_ret.unwrap().unwrap().to_vec(); - - assert_eq!(get_val, val); - let ret = storage - .get( - TableKey(key.clone()), - ((TEST_WATERMARK - 1) * 1000) << 16, - ReadOptions { - prefix_hint: Some(key.clone()), - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }, - ) - .await; - assert!(ret.is_err()); - } - #[tokio::test] async fn test_compaction_same_key_not_split() { let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 038856a3ba2f..53f15ebc1fb3 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -56,22 +56,10 @@ impl HummockMetaClient for MonitoredHummockMetaClient { self.meta_client.get_current_version().await } - async fn pin_snapshot(&self) -> Result { - self.meta_client.pin_snapshot().await - } - async fn get_snapshot(&self) -> Result { self.meta_client.get_snapshot().await } - async fn unpin_snapshot(&self) -> Result<()> { - self.meta_client.unpin_snapshot().await - } - - async fn unpin_snapshot_before(&self, _min_epoch: HummockEpoch) -> Result<()> { - unreachable!("Currently CNs should not call this function") - } - async fn get_new_sst_ids(&self, number: u32) -> Result { self.stats.get_new_sst_ids_counts.inc(); let timer = self.stats.get_new_sst_ids_latency.start_timer(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 328c23f8fbe8..b04c39769e90 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -409,11 +409,7 @@ async fn start_replay( // pop the latest epoch replayed_epochs.pop(); let mut epochs = vec![max_committed_epoch]; - epochs.extend( - pin_old_snapshots(&meta_client, &replayed_epochs, 1) - .await - .into_iter(), - ); + epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter()); tracing::info!("===== Prepare to check snapshots: {:?}", epochs); let old_version_iters = open_hummock_iters(&hummock, &epochs, table_to_check).await?; @@ -519,15 +515,14 @@ async fn start_replay( Ok(()) } -async fn pin_old_snapshots( - meta_client: &MetaClient, +fn pin_old_snapshots( + _meta_client: &MetaClient, replayed_epochs: &[HummockEpoch], num: usize, ) -> Vec { let mut old_epochs = vec![]; for &epoch in replayed_epochs.iter().rev().take(num) { old_epochs.push(epoch); - let _ = meta_client.pin_specific_snapshot(epoch).await; } old_epochs } diff --git a/src/tests/compaction_test/src/lib.rs b/src/tests/compaction_test/src/lib.rs index 70f6e20b62ad..664f73d201d8 100644 --- a/src/tests/compaction_test/src/lib.rs +++ b/src/tests/compaction_test/src/lib.rs @@ -18,7 +18,6 @@ #![warn(clippy::explicit_into_iter_loop)] #![warn(clippy::explicit_iter_loop)] #![warn(clippy::inconsistent_struct_constructor)] -#![warn(clippy::unused_async)] #![warn(clippy::map_flatten)] #![warn(clippy::await_holding_lock)] #![deny(rustdoc::broken_intra_doc_links)]