Skip to content

Commit

Permalink
Add debug status to manager
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 22, 2024
1 parent 58f4fb8 commit 78ccf39
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 6 deletions.
34 changes: 32 additions & 2 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::safekeeper::{
use crate::send_wal::WalSenders;
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
use crate::timeline_access::AccessGuard;
use crate::timeline_manager::ManagerCtl;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self};
use crate::wal_backup_partial::PartialRemoteSegment;
Expand Down Expand Up @@ -469,6 +469,7 @@ pub struct Timeline {
pub(crate) broker_active: AtomicBool,
pub(crate) wal_backup_active: AtomicBool,
pub(crate) last_removed_segno: AtomicU64,
pub(crate) mgr_status: AtomicStatus,
}

impl Timeline {
Expand Down Expand Up @@ -503,6 +504,7 @@ impl Timeline {
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
})
}

Expand Down Expand Up @@ -540,6 +542,7 @@ impl Timeline {
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
})
}

Expand Down Expand Up @@ -821,7 +824,30 @@ impl Timeline {
}

info!("requesting FullAccessTimeline guard");
let guard = self.manager_ctl.full_access_guard().await?;

let res =
tokio::time::timeout(Duration::from_secs(5), self.manager_ctl.full_access_guard())
.await;

let guard = match res {
Ok(Ok(guard)) => guard,
Ok(Err(e)) => {
warn!(
"error while acquiring FullAccessTimeline guard (current state {:?}): {}",
self.mgr_status.get(),
e
);
return Err(e);
}
Err(_) => {
warn!(
"timeout while acquiring FullAccessTimeline guard (current state {:?})",
self.mgr_status.get()
);
anyhow::bail!("timeout while acquiring FullAccessTimeline guard");
}
};

Ok(FullAccessTimeline::new(self.clone(), guard))
}
}
Expand Down Expand Up @@ -1048,6 +1074,10 @@ impl ManagerTimeline {

Ok(())
}

pub(crate) fn set_status(&self, status: timeline_manager::Status) {
self.mgr_status.store(status, Ordering::Relaxed);
}
}

/// Deletes directory and it's contents. Returns false if directory does not exist.
Expand Down
72 changes: 68 additions & 4 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
//! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
//! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.

use std::{sync::Arc, time::Duration};
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use postgres_ffi::XLogSegNo;
use tokio::
task::{JoinError, JoinHandle}
;
use tokio::task::{JoinError, JoinHandle};
use tracing::{info, info_span, instrument, warn, Instrument};
use utils::lsn::Lsn;

Expand Down Expand Up @@ -196,6 +197,8 @@ pub async fn main_task(
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
) {
tli.set_status(Status::Started);

let defer_tli = tli.tli.clone();
scopeguard::defer! {
if defer_tli.is_cancelled() {
Expand All @@ -216,18 +219,28 @@ pub async fn main_task(
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();

mgr.set_status(Status::StateSnapshot);
let state_snapshot = mgr.state_snapshot().await;

let next_cfile_save = if !mgr.is_offloaded {
let num_computes = *mgr.num_computes_rx.borrow();

mgr.set_status(Status::UpdateBackup);
let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);

mgr.set_status(Status::UpdateControlFile);
let next_cfile_save = mgr.update_control_file_save(&state_snapshot).await;

mgr.set_status(Status::UpdateWalRemoval);
mgr.update_wal_removal(&state_snapshot).await;

mgr.set_status(Status::UpdatePartialBackup);
mgr.update_partial_backup(&state_snapshot).await;

if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_cfile_save, &state_snapshot)
{
mgr.set_status(Status::EvictTimeline);
mgr.evict_timeline().await;
}

Expand All @@ -236,6 +249,7 @@ pub async fn main_task(
None
};

mgr.set_status(Status::Wait);
// wait until something changes. tx channels are stored under Arc, so they will not be
// dropped until the manager task is finished.
tokio::select! {
Expand Down Expand Up @@ -268,6 +282,7 @@ pub async fn main_task(
}

msg = manager_rx.recv() => {
mgr.set_status(Status::HandleMessage);
mgr.handle_message(msg).await;
}
}
Expand Down Expand Up @@ -333,6 +348,10 @@ impl Manager {
}
}

fn set_status(&self, status: Status) {
self.tli.set_status(status);
}

fn full_access_timeline(&mut self) -> FullAccessTimeline {
assert!(!self.is_offloaded);
let guard = self.access_service.create_guard();
Expand Down Expand Up @@ -569,3 +588,48 @@ async fn await_task_finish<T>(option: &mut Option<JoinHandle<T>>) -> Result<T, J
futures::future::pending().await
}
}

#[repr(usize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
NotStarted,
Started,
StateSnapshot,
UpdateBackup,
UpdateControlFile,
UpdateWalRemoval,
UpdatePartialBackup,
EvictTimeline,
Wait,
HandleMessage,
}

pub struct AtomicStatus {
inner: AtomicUsize,
}

impl Default for AtomicStatus {
fn default() -> Self {
Self::new()
}
}

impl AtomicStatus {
pub fn new() -> Self {
AtomicStatus {
inner: AtomicUsize::new(Status::NotStarted as usize),
}
}

pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
unsafe { std::mem::transmute(self.inner.load(order)) }
}

pub fn get(&self) -> Status {
self.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
self.inner.store(val as usize, order);
}
}
2 changes: 2 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,8 @@ def stop(self, immediate: bool = False) -> "Safekeeper":

def assert_no_errors(self):
assert not self.log_contains("manager task finished prematurely")
assert not self.log_contains("error while acquiring FullAccessTimeline guard")
assert not self.log_contains("timeout while acquiring FullAccessTimeline guard")

def append_logical_message(
self, tenant_id: TenantId, timeline_id: TimelineId, request: Dict[str, Any]
Expand Down

0 comments on commit 78ccf39

Please sign in to comment.