Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): remove most usage of max_committed_epoch #18641

Open
wants to merge 17 commits into
base: yiming/per-table-try-wait-epoch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,6 @@ message HummockVersionArchive {
repeated HummockVersionDelta version_deltas = 2;
}

// We will have two epoch after decouple
message HummockSnapshot {
// Epoch with checkpoint, we will read durable data with it.
uint64 committed_epoch = 1;
// Epoch without checkpoint, we will read real-time data with it. But it may be rolled back.
reserved 2;
reserved "current_epoch";
}

message VersionUpdatePayload {
oneof payload {
HummockVersionDeltas version_deltas = 1;
Expand Down Expand Up @@ -273,13 +264,6 @@ message GetAssignedCompactTaskNumResponse {
uint32 num_tasks = 1;
}

message GetEpochRequest {}

message GetEpochResponse {
common.Status status = 1;
HummockSnapshot snapshot = 2;
}

// When right_exclusive=false, it represents [left, right], of which both boundary are open. When right_exclusive=true,
// it represents [left, right), of which right is exclusive.
message KeyRange {
Expand Down Expand Up @@ -811,7 +795,6 @@ service HummockManagerService {
rpc GetAssignedCompactTaskNum(GetAssignedCompactTaskNumRequest) returns (GetAssignedCompactTaskNumResponse);
rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse);
rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse);
rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse);
Expand Down
6 changes: 1 addition & 5 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ pub async fn list_version(

println!("{:#?}", version);
} else {
println!(
"Version {} max_committed_epoch {}",
version.id,
version.visible_table_committed_epoch()
);
println!("Version {}", version.id);

for (cg, levels) in &version.levels {
println!("CompactionGroup {}", cg);
Expand Down
3 changes: 1 addition & 2 deletions src/ctl/src/cmd_impl/hummock/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
let version = meta_client.disable_commit_epoch().await?;
println!(
"Disabled.\
Current version: id {}, max_committed_epoch {}",
Current version: id {}",
version.id,
version.visible_table_committed_epoch()
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
args.use_new_object_prefix_strategy,
)?)
.await?;
let version = hummock.inner().get_pinned_version().version().clone();
let version = hummock.inner().get_pinned_version().clone();
let sstable_store = hummock.sstable_store();
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
Expand Down
7 changes: 2 additions & 5 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,8 @@ pub async fn print_version_delta_in_archive(
if is_first {
is_first = false;
println!(
"delta: id {}, prev_id {}, max_committed_epoch {}, trivial_move {}",
delta.id,
delta.prev_id,
delta.max_committed_epoch,
delta.trivial_move
"delta: id {}, prev_id {}, trivial_move {}",
delta.id, delta.prev_id, delta.trivial_move
);
}
println!("compaction group id {cg_id}");
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.map(|vd| hummock_version_delta::ActiveModel {
id: Set(vd.id.to_u64() as _),
prev_id: Set(vd.prev_id.to_u64() as _),
max_committed_epoch: Set(vd.visible_table_committed_epoch() as _),
max_committed_epoch: Set(vd.max_committed_epoch_for_migration() as _),
safe_epoch: Set(0 as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set((&vd.to_protobuf()).into()),
Expand Down
9 changes: 8 additions & 1 deletion src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore<HummockStorag
let read_epoch = hummock
.inner()
.get_pinned_version()
.visible_table_committed_epoch();
.table_committed_epoch(table.id);
let Some(read_epoch) = read_epoch else {
println!(
"table {} with id {} not exist in the latest version",
table.name, table.id
);
return Ok(());
};
let storage_table = make_storage_table(hummock, &table)?;
let stream = storage_table
.batch_iter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::error::Result;
struct RwHummockVersion {
#[primary_key]
version_id: i64,
max_committed_epoch: i64,
compaction_group: JsonbVal,
}

Expand Down Expand Up @@ -101,7 +100,6 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
.values()
.map(|cg| RwHummockVersion {
version_id: version.id.to_u64() as _,
max_committed_epoch: version.visible_table_committed_epoch() as _,
compaction_group: json!(cg.to_protobuf()).into(),
})
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct RwHummockVersionDelta {
#[primary_key]
id: i64,
prev_id: i64,
max_committed_epoch: i64,
trivial_move: bool,
group_deltas: JsonbVal,
}
Expand All @@ -40,7 +39,6 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
.map(|d| RwHummockVersionDelta {
id: d.id.to_u64() as _,
prev_id: d.prev_id.to_u64() as _,
max_committed_epoch: d.visible_table_committed_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d
.group_deltas
Expand Down
6 changes: 0 additions & 6 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand All @@ -47,7 +46,6 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient};
#[async_trait::async_trait]
pub trait FrontendMetaClient: Send + Sync {
async fn try_unregister(&self);
async fn get_snapshot(&self) -> Result<HummockSnapshot>;

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId>;

Expand Down Expand Up @@ -139,10 +137,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.try_unregister().await;
}

async fn get_snapshot(&self) -> Result<HummockSnapshot> {
self.0.get_snapshot().await
}

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId> {
self.0.flush(checkpoint).await
}
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use risingwave_pb::ddl_service::{
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactTaskAssignment, CompactTaskProgress, CompactionGroupInfo,
HummockSnapshot,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -938,10 +937,6 @@ pub struct MockFrontendMetaClient {}
impl FrontendMetaClient for MockFrontendMetaClient {
async fn try_unregister(&self) {}

async fn get_snapshot(&self) -> RpcResult<HummockSnapshot> {
Ok(HummockSnapshot { committed_epoch: 0 })
}

async fn flush(&self, _checkpoint: bool) -> RpcResult<HummockVersionId> {
Ok(INVALID_VERSION_ID)
}
Expand Down
17 changes: 1 addition & 16 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version_deltas = self
.hummock_manager
.list_version_deltas(
HummockVersionId::new(req.start_id),
req.num_limit,
req.committed_epoch_limit,
)
.list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
.await?;
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
Expand Down Expand Up @@ -249,17 +245,6 @@ impl HummockManagerService for HummockServiceImpl {
}))
}

async fn get_epoch(
&self,
_request: Request<GetEpochRequest>,
) -> Result<Response<GetEpochResponse>, Status> {
let hummock_snapshot = self.hummock_manager.latest_snapshot();
Ok(Response::new(GetEpochResponse {
status: None,
snapshot: Some(hummock_snapshot),
}))
}

async fn report_full_scan_task(
&self,
request: Request<ReportFullScanTaskRequest>,
Expand Down
11 changes: 1 addition & 10 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,16 +712,13 @@ impl GlobalBarrierManager {
}

{
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into());

// Bootstrap recovery. Here we simply trigger a recovery process to achieve the
// consistency.
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);
let span = tracing::info_span!("bootstrap_recovery");
crate::telemetry::report_event(
risingwave_pb::telemetry::TelemetryEventStage::Recovery,
"normal_recovery",
Expand Down Expand Up @@ -1102,12 +1099,9 @@ impl GlobalBarrierManager {
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"failure_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
Expand All @@ -1134,12 +1128,9 @@ impl GlobalBarrierManager {

self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"adhoc_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl GlobalBarrierManager {
.context
.hummock_manager
.on_current_version(|version| {
let max_committed_epoch = version.visible_table_committed_epoch();
let max_committed_epoch = version.max_committed_epoch_for_meta();
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, max_committed_epoch,
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
};
use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::HummockSnapshot;
use sea_orm::TransactionTrait;

use crate::hummock::error::{Error, Result};
Expand Down Expand Up @@ -288,12 +287,6 @@ impl HummockManager {
)?;
}

if is_visible_table_committed_epoch {
let snapshot = HummockSnapshot { committed_epoch };
let prev_snapshot = self.latest_snapshot.swap(snapshot.into());
assert!(prev_snapshot.committed_epoch < committed_epoch);
}

for compaction_group_id in &modified_compaction_groups {
trigger_sst_stat(
&self.metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::compact_task::ReportTask;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
get_compaction_group_ids, TableGroupInfo,
Expand Down Expand Up @@ -220,10 +221,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let epoch = new_version_delta
.latest_version()
.visible_table_committed_epoch();
let mut new_version_delta = version.new_delta(None);

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -267,7 +265,7 @@ impl HummockManager {
.insert(
TableId::new(*table_id),
PbStateTableInfoDelta {
committed_epoch: epoch,
committed_epoch: INVALID_EPOCH,
compaction_group_id: *raw_group_id,
}
)
Expand Down Expand Up @@ -295,7 +293,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);
let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
HashMap::new();
// Remove member tables
Expand Down Expand Up @@ -483,7 +481,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);

let new_sst_start_id = next_sstable_object_id(
&self.env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let mut new_version_delta = version.new_delta(None);

let target_compaction_group_id = {
// merge right_group_id to left_group_id and remove right_group_id
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto

impl<'a> HummockVersionTransaction<'a> {
fn apply_compact_task(&mut self, compact_task: &CompactTask) {
let mut version_delta = self.new_delta();
let mut version_delta = self.new_delta(None);
let trivial_move = CompactStatus::is_trivial_move_task(compact_task);
version_delta.trivial_move = trivial_move;

Expand Down Expand Up @@ -1334,9 +1334,8 @@ impl HummockManager {
) -> Result<()> {
self.on_current_version(|old_version| {
tracing::info!(
"Trigger compaction for version {}, epoch {}, groups {:?}",
"Trigger compaction for version {}, groups {:?}",
old_version.id,
old_version.visible_table_committed_epoch(),
compaction_groups
);
})
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ impl HummockManager {
}

if is_visible_table_committed_epoch
&& committed_epoch <= current_version.visible_table_committed_epoch()
&& committed_epoch <= current_version.max_committed_epoch_for_meta()
{
return Err(anyhow::anyhow!(
"Epoch {} <= max_committed_epoch {}",
committed_epoch,
current_version.visible_table_committed_epoch()
current_version.max_committed_epoch_for_meta()
)
.into());
}
Expand Down
Loading
Loading