Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve slow operations observability in safekeepers #8188

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
.expect("Failed to register maxrss_kb int gauge")
});

pub const DISK_WRITE_SECONDS_BUCKETS: &[f64] = &[
0.000_050, 0.000_100, 0.000_500, 0.001, 0.003, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5,
];
/// Most common fsync latency is 50 µs - 100 µs, but it can be much higher,
/// especially during many concurrent disk operations.
pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0];

pub struct BuildInfo {
pub revision: &'static str,
Expand Down
32 changes: 24 additions & 8 deletions safekeeper/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use std::{
time::{Instant, SystemTime},
};

use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
use anyhow::Result;
use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGaugeVec,
register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, Gauge, HistogramVec, IntCounter,
IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -48,26 +48,42 @@ pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_write_wal_seconds",
"Seconds spent writing and syncing WAL to a disk in a single request",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_write_wal_seconds histogram")
});
pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_flush_wal_seconds",
"Seconds spent syncing WAL to a disk",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
});
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_persist_control_file_seconds",
"Seconds to persist and sync control file",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
});
pub static WAL_STORAGE_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_wal_storage_operation_seconds",
"Seconds spent on WAL storage operations",
&["operation"]
)
.expect("Failed to register safekeeper_wal_storage_operation_seconds histogram vec")
});
pub static MISC_OPERATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"safekeeper_misc_operation_seconds",
"Seconds spent on miscellaneous operations",
&["operation"]
)
.expect("Failed to register safekeeper_misc_operation_seconds histogram vec")
});
pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_io_bytes_total",
Expand Down Expand Up @@ -126,7 +142,7 @@ pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"safekeeper_broker_push_update_seconds",
"Seconds to push all timeline updates to the broker",
DISK_WRITE_SECONDS_BUCKETS.to_vec()
DISK_FSYNC_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_broker_push_update_seconds histogram vec")
});
Expand Down
5 changes: 5 additions & 0 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;

use crate::control_file;
use crate::metrics::MISC_OPERATION_SECONDS;
use crate::send_wal::HotStandbyFeedback;

use crate::state::TimelineState;
Expand Down Expand Up @@ -696,6 +697,10 @@ where
&mut self,
msg: &ProposerElected,
) -> Result<Option<AcceptorProposerMessage>> {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["handle_elected"])
.start_timer();

info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
Expand Down
7 changes: 6 additions & 1 deletion safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ where

/// Persist given state. c.f. start_change.
pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
self.pers.persist(s).await?;
if s.eq(&*self.pers) {
// nothing to do if state didn't change
} else {
self.pers.persist(s).await?;
}

// keep in memory values up to date
self.inmem.commit_lsn = s.commit_lsn;
self.inmem.backup_lsn = s.backup_lsn;
Expand Down
34 changes: 23 additions & 11 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::wal_backup::{self};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};

use crate::metrics::{FullTimelineInfo, WalStorageMetrics};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{debug_dump, timeline_manager, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
Expand Down Expand Up @@ -856,28 +856,40 @@ impl Timeline {
}

debug!("requesting WalResidentTimeline guard");

// Wait 5 seconds for the guard to be acquired, should be enough for uneviction.
// If it times out, most likely there is a deadlock in the manager task.
let res = tokio::time::timeout(
Duration::from_secs(5),
let started_at = Instant::now();
let status_before = self.mgr_status.get();

// Wait 30 seconds for the guard to be acquired. It can time out if someone is
// holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
// is stuck.
let res = tokio::time::timeout_at(
started_at + Duration::from_secs(30),
self.manager_ctl.wal_residence_guard(),
)
.await;

let guard = match res {
Ok(Ok(guard)) => guard,
Ok(Ok(guard)) => {
let finished_at = Instant::now();
let elapsed = finished_at - started_at;
MISC_OPERATION_SECONDS
.with_label_values(&["wal_residence_guard"])
.observe(elapsed.as_secs_f64());

guard
}
Ok(Err(e)) => {
warn!(
"error while acquiring WalResidentTimeline guard (current state {:?}): {}",
self.mgr_status.get(),
e
"error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
return Err(e);
}
Err(_) => {
warn!(
"timeout while acquiring WalResidentTimeline guard (current state {:?})",
"timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
status_before,
self.mgr_status.get()
);
anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
Expand Down
6 changes: 5 additions & 1 deletion safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use utils::lsn::Lsn;

use crate::{
control_file::{FileStorage, Storage},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
safekeeper::Term,
Expand Down Expand Up @@ -357,6 +357,10 @@ impl Manager {

/// Get a snapshot of the timeline state.
async fn state_snapshot(&self) -> StateSnapshot {
let _timer = MISC_OPERATION_SECONDS
.with_label_values(&["state_snapshot"])
.start_timer();

StateSnapshot::new(
self.tli.read_shared_state().await,
self.conf.heartbeat_timeout,
Expand Down
16 changes: 15 additions & 1 deletion safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::*;
use utils::crashsafe::durable_rename;

use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
use crate::metrics::{
time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS,
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::SafeKeeperConf;
Expand Down Expand Up @@ -331,6 +333,10 @@ impl Storage for PhysicalStorage {
}

async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["initialize_first_segment"])
.start_timer();

let segno = init_lsn.segment_number(self.wal_seg_size);
let (mut file, _) = self.open_or_create(segno).await?;
let major_pg_version = self.pg_version / 10000;
Expand Down Expand Up @@ -422,6 +428,10 @@ impl Storage for PhysicalStorage {
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["truncate_wal"])
.start_timer();

// Streaming must not create a hole, so truncate cannot be called on non-written lsn
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
bail!(
Expand Down Expand Up @@ -497,6 +507,10 @@ async fn remove_segments_from_disk(
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let _timer = WAL_STORAGE_OPERATION_SECONDS
.with_label_values(&["remove_segments_from_disk"])
.start_timer();

let mut n_removed = 0;
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
Expand Down
Loading