Skip to content

Commit

Permalink
WIP impl: switch to rate-limited but repeated logging
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Feb 29, 2024
1 parent 48578e9 commit bccf459
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 70 deletions.
84 changes: 25 additions & 59 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,10 +1023,18 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new();
if LOGGED.test_and_set(self.op) {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled, this message is only logged once per process lifetime");
}
});
elapsed
}
};
Expand All @@ -1036,14 +1044,15 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {

#[derive(
Debug,
Clone,
Copy,
IntoStaticStr,
strum_macros::EnumCount,
strum_macros::EnumIter,
strum_macros::FromRepr,
enumset::EnumSetType,
enum_map::Enum,
)]
#[strum(serialize_all = "snake_case")]
#[enumset(repr = "u8")]
pub enum SmgrQueryType {
GetRelExists,
GetRelSize,
Expand All @@ -1052,56 +1061,6 @@ pub enum SmgrQueryType {
GetSlruSegment,
}

struct AtomicSmgrQueryTypeSet {
value: std::sync::atomic::AtomicU8,
}

impl AtomicSmgrQueryTypeSet {
pub const fn new() -> Self {
AtomicSmgrQueryTypeSet {
value: AtomicU8::new(
// would love to
// ```
// enumset::EnumSet::<SmgrQueryType>::EMPTY.as_repr()
// ```
// But it's not const, see https://github.com/Lymia/enumset/issues/27
0,
),
}
}
pub fn test_and_set(&self, v: SmgrQueryType) -> bool {
use std::sync::atomic::Ordering;
let res = self
.value
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
let cur = enumset::EnumSet::<SmgrQueryType>::from_repr(cur);
if cur.contains(v) {
None
} else {
Some((cur | v).as_repr())
}
});
res.ok().is_some()
}
}

#[cfg(test)]
mod test_atomic_smgr_query_type_set {
use super::*;
#[test]
fn test_basic() {
let set = AtomicSmgrQueryTypeSet::new();
// first set returns true
assert!(set.test_and_set(SmgrQueryType::GetPageAtLsn));
// try it again, should report false
assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn));
// set something that's not set before
assert!(set.test_and_set(SmgrQueryType::GetDbSize));
// flags are independent
assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn));
}
}

#[derive(Debug)]
pub(crate) struct SmgrQueryTimePerTimeline {
metrics: [GlobalAndPerTimelineHistogram; SmgrQueryType::COUNT],
Expand Down Expand Up @@ -1207,10 +1166,18 @@ impl SmgrQueryTimePerTimeline {
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new();
if LOGGED.test_and_set(op) {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled, this message is only logged once per process lifetime");
}
});
}
}
GlobalAndPerTimelineHistogramTimer {
Expand Down Expand Up @@ -2070,7 +2037,6 @@ use futures::Future;
use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::AtomicU8;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
Expand Down
21 changes: 10 additions & 11 deletions pageserver/src/tenant/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -157,20 +157,19 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
// TODO: really, callers should do this
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use std::sync::atomic::AtomicBool;
static LOGGED: AtomicBool = AtomicBool::new(false);
if LOGGED
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.call(move || {
warn!(error, "error adding time spent throttled; this message is rate-limited globally");
});
}
};
}
}
}
}

0 comments on commit bccf459

Please sign in to comment.