Skip to content

Commit

Permalink
refactor(meta): deprecate pin/unpin snapshot (#18493)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 20, 2024
1 parent 6b29ec9 commit 409bc8d
Show file tree
Hide file tree
Showing 24 changed files with 29 additions and 1,007 deletions.
47 changes: 0 additions & 47 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -267,50 +267,19 @@ message UnpinVersionResponse {
common.Status status = 1;
}

message PinSnapshotRequest {
uint32 context_id = 1;
}

message PinSpecificSnapshotRequest {
uint32 context_id = 1;
uint64 epoch = 2;
}

message GetAssignedCompactTaskNumRequest {}

message GetAssignedCompactTaskNumResponse {
uint32 num_tasks = 1;
}

message PinSnapshotResponse {
common.Status status = 1;
HummockSnapshot snapshot = 2;
}

message GetEpochRequest {}

message GetEpochResponse {
common.Status status = 1;
HummockSnapshot snapshot = 2;
}

message UnpinSnapshotRequest {
uint32 context_id = 1;
}

message UnpinSnapshotResponse {
common.Status status = 1;
}

message UnpinSnapshotBeforeRequest {
uint32 context_id = 1;
HummockSnapshot min_snapshot = 3;
}

message UnpinSnapshotBeforeResponse {
common.Status status = 1;
}

// When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true,
// it represents [left, right), of which right is exclusive.
message KeyRange {
Expand Down Expand Up @@ -641,23 +610,12 @@ message PinnedVersionsSummary {
map<uint32, common.WorkerNode> workers = 2;
}

message PinnedSnapshotsSummary {
repeated HummockPinnedSnapshot pinned_snapshots = 1;
map<uint32, common.WorkerNode> workers = 2;
}

message RiseCtlGetPinnedVersionsSummaryRequest {}

message RiseCtlGetPinnedVersionsSummaryResponse {
PinnedVersionsSummary summary = 1;
}

message RiseCtlGetPinnedSnapshotsSummaryRequest {}

message RiseCtlGetPinnedSnapshotsSummaryResponse {
PinnedSnapshotsSummary summary = 1;
}

message InitMetadataForReplayRequest {
repeated catalog.Table tables = 1;
repeated CompactionGroupInfo compaction_groups = 2;
Expand Down Expand Up @@ -853,18 +811,13 @@ service HummockManagerService {
rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse);
rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse);
rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse);
rpc PinSnapshot(PinSnapshotRequest) returns (PinSnapshotResponse);
rpc PinSpecificSnapshot(PinSpecificSnapshotRequest) returns (PinSnapshotResponse);
rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse);
rpc UnpinSnapshot(UnpinSnapshotRequest) returns (UnpinSnapshotResponse);
rpc UnpinSnapshotBefore(UnpinSnapshotBeforeRequest) returns (UnpinSnapshotBeforeResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse);
rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse);
rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse);
rpc RiseCtlGetPinnedSnapshotsSummary(RiseCtlGetPinnedSnapshotsSummaryRequest) returns (RiseCtlGetPinnedSnapshotsSummaryResponse);
rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse);
rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse);
rpc RiseCtlPauseVersionCheckpoint(RiseCtlPauseVersionCheckpointRequest) returns (RiseCtlPauseVersionCheckpointResponse);
Expand Down
34 changes: 1 addition & 33 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::hummock::{PinnedSnapshotsSummary, PinnedVersionsSummary};
use risingwave_pb::hummock::PinnedVersionsSummary;
use risingwave_rpc_client::HummockMetaClient;

use crate::CtlContext;
Expand Down Expand Up @@ -118,38 +118,6 @@ pub async fn list_pinned_versions(context: &CtlContext) -> anyhow::Result<()> {
Ok(())
}

pub async fn list_pinned_snapshots(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let PinnedSnapshotsSummary {
mut pinned_snapshots,
workers,
} = meta_client
.risectl_get_pinned_snapshots_summary()
.await?
.summary
.unwrap();
pinned_snapshots.sort_by_key(|s| s.minimal_pinned_snapshot);
for pinned_snapshot in pinned_snapshots {
match workers.get(&pinned_snapshot.context_id) {
None => {
println!(
"Worker {} may have been dropped, min_pinned_snapshot {}",
pinned_snapshot.context_id, pinned_snapshot.minimal_pinned_snapshot
);
}
Some(worker) => {
println!(
"Worker {} type {} min_pinned_snapshot {}",
pinned_snapshot.context_id,
worker.r#type().as_str_name(),
pinned_snapshot.minimal_pinned_snapshot
);
}
}
}
Ok(())
}

pub async fn rebuild_table_stats(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
meta_client.risectl_rebuild_table_stats().await?;
Expand Down
9 changes: 1 addition & 8 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::Compressi
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use thiserror_ext::AsReport;

use crate::cmd_impl::hummock::{
build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions,
};
use crate::cmd_impl::hummock::{build_compaction_config_vec, list_pinned_versions};
use crate::cmd_impl::meta::EtcdBackend;
use crate::cmd_impl::throttle::apply_throttle;
use crate::common::CtlContext;
Expand Down Expand Up @@ -223,8 +221,6 @@ enum HummockCommands {
},
/// List pinned versions of each worker.
ListPinnedVersions {},
/// List pinned snapshots of each worker.
ListPinnedSnapshots {},
/// List all compaction groups.
ListCompactionGroup,
/// Update compaction config for compaction groups.
Expand Down Expand Up @@ -654,9 +650,6 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
list_pinned_versions(context).await?
}
Commands::Hummock(HummockCommands::ListPinnedSnapshots {}) => {
list_pinned_snapshots(context).await?
}
Commands::Hummock(HummockCommands::ListCompactionGroup) => {
cmd_impl::hummock::list_compaction_group(context).await?
}
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mod rw_hummock_compact_task_assignment;
mod rw_hummock_compact_task_progress;
mod rw_hummock_compaction_group_configs;
mod rw_hummock_meta_configs;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
mod rw_hummock_version;
mod rw_hummock_version_deltas;
Expand Down

This file was deleted.

37 changes: 0 additions & 37 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient};
#[async_trait::async_trait]
pub trait FrontendMetaClient: Send + Sync {
async fn try_unregister(&self);

async fn pin_snapshot(&self) -> Result<HummockSnapshot>;

async fn get_snapshot(&self) -> Result<HummockSnapshot>;

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot>;
Expand All @@ -73,10 +70,6 @@ pub trait FrontendMetaClient: Send + Sync {

async fn list_object_dependencies(&self) -> Result<Vec<PbObjectDependencies>>;

async fn unpin_snapshot(&self) -> Result<()>;

async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>;

async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>>;

async fn get_system_params(&self) -> Result<SystemParamsReader>;
Expand All @@ -98,9 +91,6 @@ pub trait FrontendMetaClient: Send + Sync {
/// Returns vector of (worker_id, min_pinned_version_id)
async fn list_hummock_pinned_versions(&self) -> Result<Vec<(u32, u64)>>;

/// Returns vector of (worker_id, min_pinned_snapshot_id)
async fn list_hummock_pinned_snapshots(&self) -> Result<Vec<(u32, u64)>>;

async fn get_hummock_current_version(&self) -> Result<HummockVersion>;

async fn get_hummock_checkpoint_version(&self) -> Result<HummockVersion>;
Expand Down Expand Up @@ -149,10 +139,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.try_unregister().await;
}

async fn pin_snapshot(&self) -> Result<HummockSnapshot> {
self.0.pin_snapshot().await
}

async fn get_snapshot(&self) -> Result<HummockSnapshot> {
self.0.get_snapshot().await
}
Expand Down Expand Up @@ -196,14 +182,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.list_object_dependencies().await
}

async fn unpin_snapshot(&self) -> Result<()> {
self.0.unpin_snapshot().await
}

async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()> {
self.0.unpin_snapshot_before(epoch).await
}

async fn list_meta_snapshots(&self) -> Result<Vec<MetaSnapshotMetadata>> {
let manifest = self.0.get_meta_snapshot_manifest().await?;
Ok(manifest.snapshot_metadata)
Expand Down Expand Up @@ -257,21 +235,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
Ok(ret)
}

async fn list_hummock_pinned_snapshots(&self) -> Result<Vec<(u32, u64)>> {
let pinned_snapshots = self
.0
.risectl_get_pinned_snapshots_summary()
.await?
.summary
.unwrap()
.pinned_snapshots;
let ret = pinned_snapshots
.into_iter()
.map(|s| (s.context_id, s.minimal_pinned_snapshot))
.collect();
Ok(ret)
}

async fn get_hummock_current_version(&self) -> Result<HummockVersion> {
self.0.get_current_version().await
}
Expand Down
Loading

0 comments on commit 409bc8d

Please sign in to comment.