Skip to content

Commit

Permalink
Improve slow operations observability in safekeepers
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 27, 2024
1 parent d557002 commit 53c0746
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 25 deletions.
7 changes: 4 additions & 3 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
.expect("Failed to register maxrss_kb int gauge")
});

pub const DISK_WRITE_SECONDS_BUCKETS: &[f64] = &[
0.000_050, 0.000_100, 0.000_500, 0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5,
];
/// Most common fsync latency is 50 µs - 100 µs, but it can be much higher,
/// especially during many concurrent disk operations.
pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0];

pub struct BuildInfo {
pub revision: &'static str,
Expand Down
32 changes: 24 additions & 8 deletions safekeeper/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use std::{
time::{Instant, SystemTime},
};

use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
use anyhow::Result;
use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGaugeVec,
register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -48,26 +48,42 @@ pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_write_wal_seconds",
"Seconds spent writing and syncing WAL to a disk in a single request",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_write_wal_seconds histogram")
});
pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_flush_wal_seconds",
"Seconds spent syncing WAL to a disk",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
});
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_persist_control_file_seconds",
"Seconds to persist and sync control file",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
});
pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_wal_storage_operation_seconds",
"Seconds spent on WAL storage operations",
&["operation"]
)
.expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
});
pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_misc_operation_seconds",
"Seconds spent on miscellaneous operations",
&["operation"]
)
.expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
});
pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_io_bytes_total",
Expand Down Expand Up @@ -126,7 +142,7 @@ pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
Expand Down
5 changes: 5 additions & 0 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;

use crate::control_file;
use crate::metrics::MISC_OPERATION_SECONDS;
use crate::send_wal::HotStandbyFeedback;

use crate::state::TimelineState;
Expand Down Expand Up @@ -696,6 +697,10 @@ where
&mut self,
msg: &ProposerElected,
) -> Result<Option<AcceptorProposerMessage>> {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["handle_elected"])
.start_timer();

info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
Expand Down
7 changes: 6 additions & 1 deletion safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ where

/// Persist given state. c.f. start_change.
pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
self.pers.persist(s).await?;
if s.eq(&*self.pers) {
// nothing to do if state didn't change
} else {
self.pers.persist(s).await?;
}

// keep in memory values up to date
self.inmem.commit_lsn = s.commit_lsn;
self.inmem.backup_lsn = s.backup_lsn;
Expand Down
34 changes: 23 additions & 11 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::wal_backup::{self};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};

use crate::metrics::{FullTimelineInfo, WalStorageMetrics};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{debug_dump, timeline_manager, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
Expand Down Expand Up @@ -856,28 +856,40 @@ impl Timeline {
}

debug!("requesting WalResidentTimeline guard");

// Wait 5 seconds for the guard to be acquired, should be enough for uneviction.
// If it times out, most likely there is a deadlock in the manager task.
let res = tokio::time::timeout(
Duration::from_secs(5),
let started_at = Instant::now();
let status_before = self.mgr_status.get();

// Wait 30 seconds for the guard to be acquired. It can time out if someone is
// holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
// is stuck.
let res = tokio::time::timeout_at(
started_at + Duration::from_secs(30),
self.manager_ctl.wal_residence_guard(),
)
.await;

let guard = match res {
Ok(Ok(guard)) => guard,
Ok(Ok(guard)) => {
let finished_at = Instant::now();
let elapsed = finished_at - started_at;
MISC_OPERATION_SECONDS
.with_label_values(&["wal_residence_guard"])
.observe(elapsed.as_secs_f64());

guard
}
Ok(Err(e)) => {
warn!(
"error while acquiring WalResidentTimeline guard (current state {:?}): {}",
self.mgr_status.get(),
e
"error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
return Err(e);
}
Err(_) => {
warn!(
"timeout while acquiring WalResidentTimeline guard (current state {:?})",
"timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
Expand Down
6 changes: 5 additions & 1 deletion safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use utils::lsn::Lsn;

use crate::{
control_file::{FileStorage, Storage},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
safekeeper::Term,
Expand Down Expand Up @@ -357,6 +357,10 @@ impl Manager {

/// Get a snapshot of the timeline state.
async fn state_snapshot(&self) -> StateSnapshot {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["state_snapshot"])
.start_timer();

StateSnapshot::new(
self.tli.read_shared_state().await,
self.conf.heartbeat_timeout,
Expand Down
16 changes: 15 additions & 1 deletion safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::*;
use utils::crashsafe::durable_rename;

use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
use crate::metrics::{
time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS,
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::SafeKeeperConf;
Expand Down Expand Up @@ -331,6 +333,10 @@ impl Storage for PhysicalStorage {
}

async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["initialize_first_segment"])
.start_timer();

let segno = init_lsn.segment_number(self.wal_seg_size);
let (mut file, _) = self.open_or_create(segno).await?;
let major_pg_version = self.pg_version / 10000;
Expand Down Expand Up @@ -422,6 +428,10 @@ impl Storage for PhysicalStorage {
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["truncate_wal"])
.start_timer();

// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
bail!(
Expand Down Expand Up @@ -497,6 +507,10 @@ async fn remove_segments_from_disk(
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["remove_segments_from_disk"])
.start_timer();

let mut n_removed = 0;
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
Expand Down

0 comments on commit 53c0746

Please sign in to comment.