Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 26, 2024
1 parent 4f6feb6 commit 8d9e72d
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
4 changes: 2 additions & 2 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl WalResidentTimeline {
}
let from_segno = from_lsn.segment_number(wal_seg_size);
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let last_log_term = shared_state.sk.last_log_term();
let flush_lsn = shared_state.sk.flush_lsn();
let upto_segno = flush_lsn.segment_number(wal_seg_size);
// have some limit on max number of segments as a sanity check
Expand Down Expand Up @@ -231,7 +231,7 @@ impl WalResidentTimeline {
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
let shared_state = self.read_shared_state().await;
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let last_log_term = shared_state.sk.last_log_term();
// There are some cases to relax this check (e.g. last_log_term might
// change, but as long as older history is strictly part of new that's
// fine), but there is no need to do it.
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn recovery_needed(
) -> RecoveryNeededInfo {
let ss = tli.read_shared_state().await;
let term = ss.sk.state().acceptor_state.term;
let last_log_term = ss.sk.get_last_log_term();
let last_log_term = ss.sk.last_log_term();
let flush_lsn = ss.sk.flush_lsn();
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
let mut peers = ss.get_peers(heartbeat_timeout);
Expand Down
7 changes: 4 additions & 3 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ impl<'a> DerefMut for WriteGuardSharedState<'a> {

impl<'a> Drop for WriteGuardSharedState<'a> {
fn drop(&mut self) {
let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
let term_flush_lsn =
TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
let commit_lsn = self.guard.sk.state().inmem.commit_lsn;

let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
Expand Down Expand Up @@ -209,7 +210,7 @@ impl StateSK {
self.state().acceptor_state.term
}

pub fn get_last_log_term(&self) -> Term {
pub fn last_log_term(&self) -> Term {
self.state()
.acceptor_state
.get_last_log_term(self.flush_lsn())
Expand Down Expand Up @@ -406,7 +407,7 @@ impl SharedState {
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
term: self.sk.state().acceptor_state.term,
last_log_term: self.sk.get_last_log_term(),
last_log_term: self.sk.last_log_term(),
flush_lsn: self.sk.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: self.sk.state().inmem.commit_lsn.0,
Expand Down
4 changes: 2 additions & 2 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) struct StateSnapshot {

// latest state
pub(crate) flush_lsn: Lsn,
pub(crate) term: Term,
pub(crate) last_log_term: Term,

// misc
pub(crate) cfile_last_persist_at: std::time::Instant,
Expand All @@ -70,7 +70,7 @@ impl StateSnapshot {
cfile_remote_consistent_lsn: state.remote_consistent_lsn,
cfile_backup_lsn: state.backup_lsn,
flush_lsn: read_guard.sk.flush_lsn(),
term: state.acceptor_state.term,
last_log_term: read_guard.sk.last_log_term(),
cfile_last_persist_at: state.pers.last_persist_at(),
inmem_flush_pending: Self::has_unflushed_inmem_state(state),
wal_removal_on_hold: read_guard.wal_removal_on_hold,
Expand Down
23 changes: 15 additions & 8 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ use crate::{

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum UploadStatus {
/// Upload is in progress
/// Upload is in progress. This status should be used only for garbage collection,
/// don't read data from the remote storage with this status.
InProgress,
/// Upload is finished
/// Upload is finished. There is always at most one segment with this status.
/// It means that the segment is actual and can be used.
Uploaded,
/// Deletion is in progress
/// Deletion is in progress. This status should be used only for garbage collection,
/// don't read data from the remote storage with this status.
Deleting,
}

Expand All @@ -51,6 +54,10 @@ pub struct PartialRemoteSegment {
pub name: String,
pub commit_lsn: Lsn,
pub flush_lsn: Lsn,
// We should use last_log_term here, otherwise it's possible to have inconsistent data in the
// remote storage.
//
// More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
pub term: Term,
}

Expand Down Expand Up @@ -133,17 +140,17 @@ impl PartialBackup {
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
let flush_lsn = Lsn(sk_info.flush_lsn);
let commit_lsn = Lsn(sk_info.commit_lsn);
let term = sk_info.term;
let last_log_term = sk_info.last_log_term;
let segno = self.segno(flush_lsn);

let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);

PartialRemoteSegment {
status: UploadStatus::InProgress,
name,
commit_lsn,
flush_lsn,
term,
term: last_log_term,
}
}

Expand All @@ -166,7 +173,7 @@ impl PartialBackup {
// If the term changed, we cannot guarantee the validity of the uploaded data.
// If the term is the same, we know the data is not corrupted.
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
if sk_info.term != prepared.term {
if sk_info.last_log_term != prepared.term {
anyhow::bail!("term changed during upload");
}
assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
Expand Down Expand Up @@ -285,7 +292,7 @@ pub(crate) fn needs_uploading(
uploaded.status != UploadStatus::Uploaded
|| uploaded.flush_lsn != state.flush_lsn
|| uploaded.commit_lsn != state.commit_lsn
|| uploaded.term != state.term
|| uploaded.term != state.last_log_term
}
None => true,
}
Expand Down

0 comments on commit 8d9e72d

Please sign in to comment.