Skip to content

Commit

Permalink
Update remote_consistent_lsn in offloaded state
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 24, 2024
1 parent a15f54f commit 026868e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 34 deletions.
27 changes: 1 addition & 26 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,10 +916,8 @@ where
)))
}

/// Update timeline state with peer safekeeper data.
/// Update commit_lsn from peer safekeeper data.
pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
let mut sync_control_file = false;

if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
Expand All @@ -928,29 +926,6 @@ where
self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
}
}

self.state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), self.state.inmem.backup_lsn);
sync_control_file |= self.state.backup_lsn + (self.state.server.wal_seg_size as u64)
< self.state.inmem.backup_lsn;

self.state.inmem.remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
self.state.inmem.remote_consistent_lsn,
);
sync_control_file |= self.state.remote_consistent_lsn
+ (self.state.server.wal_seg_size as u64)
< self.state.inmem.remote_consistent_lsn;

self.state.inmem.peer_horizon_lsn = max(
Lsn(sk_info.peer_horizon_lsn),
self.state.inmem.peer_horizon_lsn,
);
sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.state.inmem.peer_horizon_lsn;

if sync_control_file {
self.state.flush().await?;
}
Ok(())
}
}
Expand Down
35 changes: 27 additions & 8 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,36 @@ impl StateSK {
}

pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
// update commit_lsn if safekeeper is loaded
match self {
StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await,
StateSK::Offloaded(_) => {
warn!(
"received broker message for offloaded timeline, ignoring: {:?}",
sk_info
);
Ok(())
}
StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
StateSK::Offloaded(_) => {}
StateSK::Empty => unreachable!(),
}

// update everything else, including remote_consistent_lsn and backup_lsn
let mut sync_control_file = false;
let state = self.state_mut();
let wal_seg_size = state.server.wal_seg_size as u64;

state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;

state.inmem.remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
state.inmem.remote_consistent_lsn,
);
sync_control_file |=
state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;

state.inmem.peer_horizon_lsn =
max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;

if sync_control_file {
state.flush().await?;
}
Ok(())
}

pub fn term_start_lsn(&self) -> Lsn {
Expand Down

0 comments on commit 026868e

Please sign in to comment.