Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(per-tenant throttling): exclude throttled time from page_service metrics + regression test #6953

Merged
merged 25 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0b93161
make throttle warning info! level
problame Feb 28, 2024
cfdbfec
feat(tests): option to make log_contains search a suffix of the log
problame Feb 29, 2024
57d9985
Merge remote-tracking branch 'origin/main' into problame/log-contains…
problame Feb 29, 2024
07647fe
add (failing) test case for getpage throttling that asserts effect & …
problame Feb 28, 2024
0440214
RequestContext shouldn't be `Clone`, only RequestContextAdaptor uses it
problame Feb 28, 2024
1e6b0c0
WIP impl: draft for getpage, test passes, other smgr services aren't …
problame Feb 28, 2024
48578e9
WIP impl: generalize it by moving it to the throttling metric
problame Feb 29, 2024
bccf459
WIP impl: switch to rate-limited but repeated logging
problame Feb 29, 2024
9671ac3
make mypy happy
problame Feb 29, 2024
bc5c5be
Merge remote-tracking branch 'origin/main' into problame/log-contains…
problame Feb 29, 2024
7afbcd1
audit log_contains users for truthiness pitfall
problame Feb 29, 2024
956a383
implement LogCursor concept for log_contains
problame Feb 29, 2024
d36edab
Revert "audit log_contains users for truthiness pitfall"
problame Feb 29, 2024
3227e33
add assert_log_contains
problame Feb 29, 2024
505946d
wait_until usage corrections & fix potential truthiness issues in som…
problame Feb 29, 2024
0f3db7b
implement LogCursor concept for log_contains
problame Feb 29, 2024
7b4f12e
add assert_log_contains
problame Feb 29, 2024
1bf12fd
wait_until usage corrections & fix potential truthiness issues in som…
problame Feb 29, 2024
6018e23
self-review
problame Feb 29, 2024
a5e0fe5
Merge branch 'problame/log-contains-since_pattern' into problame/thro…
problame Feb 29, 2024
d86df72
test_remote_storage_upload_queue_retries: need to raise timeout to ma…
problame Feb 29, 2024
6badbd6
Merge remote-tracking branch 'origin/main' into problame/throttling-o…
problame Mar 1, 2024
d9c3fa7
adjust log messages
problame Mar 1, 2024
169065f
DO NOT MERGE: run test 100 times to ensure it's not flaky
problame Mar 5, 2024
2dcf5d8
Revert "DO NOT MERGE: run test 100 times to ensure it's not flaky"
problame Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pageserver/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@

use crate::task_mgr::TaskKind;

pub(crate) mod optional_counter;

// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}

/// The kind of access to the page cache.
Expand Down Expand Up @@ -150,6 +153,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
},
}
}
Expand All @@ -163,6 +167,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
},
}
}
Expand Down
101 changes: 101 additions & 0 deletions pageserver/src/context/optional_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};

#[derive(Debug)]
pub struct CounterU32 {
inner: AtomicU32,
}
impl Default for CounterU32 {
fn default() -> Self {
Self {
inner: AtomicU32::new(u32::MAX),
}
}
}
impl CounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
match self
.inner
.compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => Ok(()),
Err(_) => Err("open() called on clsoed state"),
}
}
pub fn close(&self) -> Result<u32, &'static str> {
match self.inner.swap(u32::MAX, Ordering::Relaxed) {
u32::MAX => Err("close() called on closed state"),
x => Ok(x),
}
}

pub fn add(&self, count: u32) -> Result<(), &'static str> {
if count == 0 {
return Ok(());
}
let mut had_err = None;
self.inner
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur {
u32::MAX => {
had_err = Some("add() called on closed state");
None
}
x => {
let (new, overflowed) = x.overflowing_add(count);
if new == u32::MAX || overflowed {
had_err = Some("add() overflowed the counter");
None
} else {
Some(new)
}
}
})
.map_err(|_| had_err.expect("we set it whenever the function returns None"))
.map(|_| ())
}
}

#[derive(Default, Debug)]
pub struct MicroSecondsCounterU32 {
inner: CounterU32,
}

impl MicroSecondsCounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
self.inner.open()
}
pub fn add(&self, duration: Duration) -> Result<(), &'static str> {
match duration.as_micros().try_into() {
Ok(x) => self.inner.add(x),
Err(_) => Err("add(): duration conversion error"),
}
}
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
let val = self.inner.close()?;
let val = Duration::from_micros(val as u64);
let subbed = match from.checked_sub(val) {
Some(v) => v,
None => return Err("Duration::checked_sub"),
};
Ok(subbed)
}
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
fn test_basic() {
let counter = MicroSecondsCounterU32::default();
counter.open().unwrap();
counter.add(Duration::from_micros(23)).unwrap();
let res = counter
.close_and_checked_sub_from(Duration::from_micros(42))
.unwrap();
assert_eq!(res, Duration::from_micros(42 - 23));
}
}
68 changes: 62 additions & 6 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, IntoEnumIterator, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;

/// Prometheus histogram buckets (in seconds) for operations in the critical
Expand Down Expand Up @@ -1005,15 +1006,39 @@ impl GlobalAndPerTimelineHistogram {
}
}

struct GlobalAndPerTimelineHistogramTimer<'a> {
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
h: &'a GlobalAndPerTimelineHistogram,
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
}

impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> {
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
self.h.observe(elapsed.as_secs_f64());
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit");
jcsp marked this conversation as resolved.
Show resolved Hide resolved
});
elapsed
}
};
self.h.observe(ex_throttled.as_secs_f64());
}
}

Expand All @@ -1025,6 +1050,7 @@ impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> {
strum_macros::EnumCount,
strum_macros::EnumIter,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[strum(serialize_all = "snake_case")]
pub enum SmgrQueryType {
Expand Down Expand Up @@ -1130,11 +1156,35 @@ impl SmgrQueryTimePerTimeline {
});
Self { metrics }
}
pub(crate) fn start_timer(&self, op: SmgrQueryType) -> impl Drop + '_ {
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> impl Drop + '_ {
let metric = &self.metrics[op as usize];
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
GlobalAndPerTimelineHistogramTimer {
h: metric,
start: std::time::Instant::now(),
ctx,
start,
op,
}
}
}
Expand All @@ -1145,6 +1195,11 @@ mod smgr_query_time_tests {
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};

use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
};

// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
Expand Down Expand Up @@ -1193,7 +1248,8 @@ mod smgr_query_time_tests {
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);

let timer = metrics.start_timer(*op);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer(*op, &ctx);
drop(timer);

let (post_global, post_per_tenant_timeline) = get_counts();
Expand Down
10 changes: 5 additions & 5 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ impl PageServerHandler {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists);
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down Expand Up @@ -939,7 +939,7 @@ impl PageServerHandler {

let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize);
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down Expand Up @@ -967,7 +967,7 @@ impl PageServerHandler {

let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize);
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down Expand Up @@ -1145,7 +1145,7 @@ impl PageServerHandler {

let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down Expand Up @@ -1173,7 +1173,7 @@ impl PageServerHandler {

let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment);
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
}
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
warn!(
info!(
n_seconds=%format_args!("{:.3}",
delta.as_secs_f64()),
count_accounted,
Expand Down
17 changes: 15 additions & 2 deletions pageserver/src/tenant/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
time::{Duration, Instant},
};

use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::error;
use tracing::{error, warn};

use crate::{context::RequestContext, task_mgr::TaskKind};

Expand Down Expand Up @@ -157,6 +157,19 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.call(move || {
warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
});
}
}
}
}
}
Loading