Skip to content

Commit

Permalink
WIP: impl for getpage, test passes, other smgr services aren't covere…
Browse files Browse the repository at this point in the history
…d, measure overhead first
  • Loading branch information
problame committed Feb 28, 2024
1 parent abbb877 commit d417cb9
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 5 deletions.
7 changes: 6 additions & 1 deletion pageserver/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@

use crate::task_mgr::TaskKind;

pub(crate) mod optional_counter;

// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
}

/// The kind of access to the page cache.
Expand Down Expand Up @@ -150,6 +153,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
},
}
}
Expand All @@ -163,6 +167,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
},
}
}
Expand Down
101 changes: 101 additions & 0 deletions pageserver/src/context/optional_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};

#[derive(Debug)]
pub struct CounterU32 {
inner: AtomicU32,
}
impl Default for CounterU32 {
fn default() -> Self {
Self {
inner: AtomicU32::new(u32::MAX),
}
}
}
impl CounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
match self
.inner
.compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => Ok(()),
Err(_) => Err("open() called on clsoed state"),
}
}
pub fn close(&self) -> Result<u32, &'static str> {
match self.inner.swap(u32::MAX, Ordering::Relaxed) {
u32::MAX => Err("close() called on closed state"),
x => Ok(x),
}
}

pub fn add(&self, count: u32) -> Result<(), &'static str> {
if count == 0 {
return Ok(());
}
let mut had_err = None;
self.inner
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur {
u32::MAX => {
had_err = Some("add() called on closed state");
None
}
x => {
let (new, overflowed) = x.overflowing_add(count);
if new == u32::MAX || overflowed {
had_err = Some("add() overflowed the counter");
None
} else {
Some(new)
}
}
})
.map_err(|_| had_err.expect("we set it whenever the function returns None"))
.map(|_| ())
}
}

#[derive(Default, Debug)]
pub struct MicroSecondsCounterU32 {
inner: CounterU32,
}

impl MicroSecondsCounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
self.inner.open()
}
pub fn add(&self, duration: Duration) -> Result<(), &'static str> {
match duration.as_micros().try_into() {
Ok(x) => self.inner.add(x),
Err(_) => Err("add(): duration conversion error"),
}
}
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
let val = self.inner.close()?;
let val = Duration::from_micros(val as u64);
let subbed = match from.checked_sub(val) {
Some(v) => v,
None => return Err("Duration::checked_sub"),
};
Ok(subbed)
}
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
fn test_basic() {
let counter = MicroSecondsCounterU32::default();
counter.open().unwrap();
counter.add(Duration::from_micros(23)).unwrap();
let res = counter
.close_and_checked_sub_from(Duration::from_micros(42))
.unwrap();
assert_eq!(res, Duration::from_micros(42 - 23));
}
}
4 changes: 4 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,10 @@ impl SmgrQueryTimePerTimeline {
start: std::time::Instant::now(),
}
}
pub(crate) fn observe(&self, op: SmgrQueryType, duration: Duration) {
let metric = &self.metrics[op as usize];
metric.observe(duration.as_secs_f64())
}
}

#[cfg(test)]
Expand Down
42 changes: 39 additions & 3 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
Expand Down Expand Up @@ -1143,9 +1144,44 @@ impl PageServerHandler {
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
set_tracing_field_shard_id(timeline);

let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use std::sync::atomic::{AtomicBool, Ordering};
static LOGGED: AtomicBool = AtomicBool::new(false);
match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => {
warn!(error, "error opening micros_spent_throttled, this message is only logged once per process lifetime");
}
Err(_) => {}
}
}
}
scopeguard::defer!({
let elapsed = start.elapsed();
let ex_throttled = ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use std::sync::atomic::{AtomicBool, Ordering};
static LOGGED: AtomicBool = AtomicBool::new(false);
match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => {
warn!(error, "error deducting time spent throttled, this message is only logged once per process lifetime");
}
Err(_) => {}
}
elapsed
}
};
timeline
.query_metrics
.observe(metrics::SmgrQueryType::GetPageAtLsn, ex_throttled);
});

let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Expand Down
17 changes: 16 additions & 1 deletion pageserver/src/tenant/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::error;
use tracing::{error, warn};

use crate::{context::RequestContext, task_mgr::TaskKind};

Expand Down Expand Up @@ -157,6 +157,21 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
// TODO: really, callers should do this
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use std::sync::atomic::{AtomicBool, Ordering};
static LOGGED: AtomicBool = AtomicBool::new(false);
match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => {
warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime");
}
Err(_) => {}
}
}
};
}
}
}

0 comments on commit d417cb9

Please sign in to comment.