From 0b931614cc8a05f9a6211aa5da6c9f4fc1879e52 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Feb 2024 16:20:46 +0000 Subject: [PATCH 01/21] make throttle warning info! level --- pageserver/src/tenant/tasks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 45ce6c938155..57c3edcddd29 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -217,7 +217,7 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } let allowed_rps = tenant.timeline_get_throttle.steady_rps(); let delta = now - prev; - warn!( + info!( n_seconds=%format_args!("{:.3}", delta.as_secs_f64()), count_accounted, From cfdbfec1875f826e10f3e5463605dc9913d08752 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 10:24:07 +0000 Subject: [PATCH 02/21] feat(tests): option to make log_contains search a suffix of the log --- test_runner/fixtures/neon_fixtures.py | 31 +++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 71e77334a1f6..836293880575 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2343,8 +2343,13 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" - def log_contains(self, pattern: str) -> Optional[str]: - """Check that the pageserver log contains a line that matches the given regex""" + def log_contains( + self, pattern: str, skip_until: "None | str | Callable[[str], bool]" + ) -> Optional[str]: + """ + Check that the pageserver log contains a line that matches the given regex. + Use `skip_until` to limit the search to the suffix of the log that follows the first line that matches `skip_until`. + """ logfile = self.workdir / "pageserver.log" if not logfile.exists(): log.warning(f"Skipping log check: {logfile} does not exist") @@ -2352,13 +2357,35 @@ def log_contains(self, pattern: str) -> Optional[str]: contains_re = re.compile(pattern) + skip_until_finder: Callable[[str], bool] + if skip_until is None: + + def always_true_finder(_line: str) -> bool: + return True + + skip_until_finder = always_true_finder + elif isinstance(skip_until, str): + skip_until_pattern_re = re.compile(skip_until) + + def re_finder(_line: str) -> bool: + return skip_until_pattern_re.search(line) is not None + + skip_until_finder = re_finder + else: + skip_until_finder = skip_until + # XXX: Our rust logging machinery buffers the messages, so if you # call this function immediately after it's been logged, there is # no guarantee it is already present in the log file. This hasn't # been a problem in practice, our python tests are not fast enough # to hit that race condition. + skip_until_pattern_found = False with logfile.open("r") as f: for line in f: + if not skip_until_pattern_found: + skip_until_pattern_found = skip_until_finder(line) + if not skip_until_pattern_found: + continue if contains_re.search(line): # found it! return line From 07647fe08884cf33ae8b2ec8234bbfd2715c335d Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Feb 2024 16:20:13 +0000 Subject: [PATCH 03/21] add (failing) test case for getpage throttling that asserts effect & observability --- test_runner/fixtures/neon_fixtures.py | 18 ++- .../test_pageserver_getpage_throttle.py | 113 ++++++++++++++++++ 2 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 test_runner/regress/test_pageserver_getpage_throttle.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 71e77334a1f6..512aeb4c84ae 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2343,7 +2343,9 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" - def log_contains(self, pattern: str) -> Optional[str]: + def log_contains( + self, pattern: str, since_pattern: "None | str | Callable[[str], bool]" + ) -> Optional[str]: """Check that the pageserver log contains a line that matches the given regex""" logfile = self.workdir / "pageserver.log" if not logfile.exists(): @@ -2352,13 +2354,27 @@ def log_contains(self, pattern: str) -> Optional[str]: contains_re = re.compile(pattern) + since_pattern_finder: Callable[[str], bool] + if since_pattern is None: + since_pattern_finder = lambda _line: True + elif isinstance(since_pattern, str): + since_pattern_re = re.compile(since_pattern) + since_pattern_finder = lambda line: since_pattern_re.search(line) is not None + else: + since_pattern_finder = since_pattern + # XXX: Our rust logging machinery buffers the messages, so if you # call this function immediately after it's been logged, there is # no guarantee it is already present in the log file. This hasn't # been a problem in practice, our python tests are not fast enough # to hit that race condition. + since_pattern_found = False with logfile.open("r") as f: for line in f: + if not since_pattern_found: + since_pattern_found = since_pattern_finder(line) + if not since_pattern_found: + continue if contains_re.search(line): # found it! return line diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py new file mode 100644 index 000000000000..d9831b06a9d1 --- /dev/null +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -0,0 +1,113 @@ +import json +import uuid + +from anyio import Path +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin +from fixtures.pg_version import PgVersion +from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until + + +def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + env = neon_env_builder.init_start() + + env.pageserver.tenant_detach(env.initial_tenant) + + env.pageserver.allowed_errors.append( + # https://github.com/neondatabase/neon/issues/6925 + r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*" + ) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + + rate_limit_rps = 100 + compaction_period = 5 + env.pageserver.tenant_create( + tenant_id, + conf={ + "compaction_period": f"{compaction_period}s", + "timeline_get_throttle": { + "task_kinds": ["PageRequestHandler"], + "initial": 0, + "refill_interval": "100ms", + "refill_amount": int(rate_limit_rps / 10), + "max": int(rate_limit_rps / 10), + "fair": True, + }, + }, + ) + + ps_http = env.pageserver.http_client() + + ps_http.timeline_create(PgVersion.V16, tenant_id, timeline_id) + + def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: int): + cmd = [ + str(env.neon_binpath / "pagebench"), + "get-page-latest-lsn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--page-service-connstring", + env.pageserver.connstr(password=None), + "--runtime", + f"{duration_secs}s", + f"{tenant_id}/{timeline_id}", + ] + + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path, "r") as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + return int(results["total"]["request_count"]) + + log.info("warmup / make sure metrics are present") + run_pagebench_at_max_speed_and_get_total_requests_completed(2) + metrics_query = { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "smgr_query_type": "get_page_at_lsn", + } + metric_name = "pageserver_smgr_query_seconds_sum" + smgr_query_seconds_pre = ps_http.get_metric_value(metric_name, metrics_query) + assert smgr_query_seconds_pre is not None + + marker = uuid.uuid4().hex + ps_http.post_tracing_event("info", marker) + + log.info("run pagebench") + duration_secs = 10 + actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs) + + log.info("validate the client is capped at the configured rps limit") + expect_ncompleted = duration_secs * rate_limit_rps + delta_abs = abs(expect_ncompleted - actual_ncompleted) + threshold = 0.05 * expect_ncompleted + assert ( + threshold / rate_limit_rps < 0.1 * duration_secs + ), "test self-test: unrealistic expecations regarding precision in this test" + assert ( + delta_abs < 0.05 * expect_ncompleted + ), "the throttling deviates more than 5percent from the expectation" + + log.info("validate that we logged the throttling") + + def throttling_log_message_present(): + assert env.pageserver.log_contains( + f".*{tenant_id}.*shard was throttled in the last n_seconds.*", since_pattern=marker + ) + + wait_until(10, compaction_period / 10, throttling_log_message_present) + + log.info("validate that the metric doesn't include throttle wait time") + smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query) + assert smgr_query_seconds_post is not None + actual_smgr_query_seconds = smgr_query_seconds_post - smgr_query_seconds_pre + + assert ( + duration_secs >= 10 * actual_smgr_query_seconds + ), "smgr metrics should not include throttle wait time" From 0440214b8eb6695b62a3f16a8873748139c87e51 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Feb 2024 18:48:32 +0000 Subject: [PATCH 04/21] RequestContext shouldn't be `Clone`, only RequestContextAdaptor uses it --- pageserver/src/tenant/timeline/compaction.rs | 31 +++++++------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 950459cbf982..143ec0583c1b 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -75,14 +75,13 @@ impl Timeline { let keyspace = self.collect_keyspace(end_lsn, ctx).await?; let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace)); - let ctx_adaptor = RequestContextAdaptor(ctx.clone()); pageserver_compaction::compact_tiered::compact_tiered( &mut adaptor, end_lsn, target_file_size, fanout, - &ctx_adaptor, + &ctx, ) .await?; @@ -143,13 +142,13 @@ impl CompactionJobExecutor for TimelineAdaptor { type DeltaLayer = ResidentDeltaLayer; type ImageLayer = ResidentImageLayer; - type RequestContext = RequestContextAdaptor; + type RequestContext = crate::context::RequestContext; async fn get_layers( &mut self, key_range: &Range, lsn_range: &Range, - _ctx: &RequestContextAdaptor, + _ctx: &RequestContext, ) -> anyhow::Result>> { self.flush_updates().await?; @@ -170,7 +169,7 @@ impl CompactionJobExecutor for TimelineAdaptor { &mut self, key_range: &Range, lsn: Lsn, - _ctx: &RequestContextAdaptor, + _ctx: &RequestContext, ) -> anyhow::Result>> { if lsn == self.keyspace.0 { Ok(pageserver_compaction::helpers::intersect_keyspace( @@ -206,7 +205,7 @@ impl CompactionJobExecutor for TimelineAdaptor { &mut self, lsn: Lsn, key_range: &Range, - ctx: &RequestContextAdaptor, + ctx: &RequestContext, ) -> anyhow::Result<()> { Ok(self.create_image_impl(lsn, key_range, ctx).await?) } @@ -216,7 +215,7 @@ impl CompactionJobExecutor for TimelineAdaptor { lsn_range: &Range, key_range: &Range, input_layers: &[ResidentDeltaLayer], - ctx: &RequestContextAdaptor, + ctx: &RequestContext, ) -> anyhow::Result<()> { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); @@ -287,7 +286,7 @@ impl CompactionJobExecutor for TimelineAdaptor { async fn delete_layer( &mut self, layer: &OwnArc, - _ctx: &RequestContextAdaptor, + _ctx: &RequestContext, ) -> anyhow::Result<()> { self.layers_to_delete.push(layer.clone().0); Ok(()) @@ -299,7 +298,7 @@ impl TimelineAdaptor { &mut self, lsn: Lsn, key_range: &Range, - ctx: &RequestContextAdaptor, + ctx: &RequestContext, ) -> Result<(), PageReconstructError> { let timer = self.timeline.metrics.create_images_time_histo.start_timer(); @@ -361,17 +360,7 @@ impl TimelineAdaptor { } } -pub struct RequestContextAdaptor(pub RequestContext); - -impl std::ops::Deref for RequestContextAdaptor { - type Target = RequestContext; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl CompactionRequestContext for RequestContextAdaptor {} +impl CompactionRequestContext for crate::context::RequestContext {} #[derive(Debug, Clone)] pub struct OwnArc(pub Arc); @@ -451,7 +440,7 @@ impl CompactionDeltaLayer for ResidentDeltaLayer { async fn load_keys<'a>( &self, - ctx: &RequestContextAdaptor, + ctx: &RequestContext, ) -> anyhow::Result>> { self.0.load_keys(ctx).await } From 1e6b0c0b5853bdf9378623d98dd0200c62f9fda7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Feb 2024 19:09:10 +0000 Subject: [PATCH 05/21] WIP impl: draft for getpage, test passes, other smgr services aren't covered, measure overhead first --- pageserver/src/context.rs | 7 +- pageserver/src/context/optional_counter.rs | 101 +++++++++++++++++++++ pageserver/src/metrics.rs | 4 + pageserver/src/page_service.rs | 42 ++++++++- pageserver/src/tenant/throttle.rs | 17 +++- 5 files changed, 166 insertions(+), 5 deletions(-) create mode 100644 pageserver/src/context/optional_counter.rs diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ee331ea154dd..86d0390c30b1 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -88,13 +88,16 @@ use crate::task_mgr::TaskKind; +pub(crate) mod optional_counter; + // The main structure of this module, see module-level comment. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, + pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32, } /// The kind of access to the page cache. @@ -150,6 +153,7 @@ impl RequestContextBuilder { download_behavior: DownloadBehavior::Download, access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, + micros_spent_throttled: Default::default(), }, } } @@ -163,6 +167,7 @@ impl RequestContextBuilder { download_behavior: original.download_behavior, access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, + micros_spent_throttled: Default::default(), }, } } diff --git a/pageserver/src/context/optional_counter.rs b/pageserver/src/context/optional_counter.rs new file mode 100644 index 000000000000..100c649f18cb --- /dev/null +++ b/pageserver/src/context/optional_counter.rs @@ -0,0 +1,101 @@ +use std::{ + sync::atomic::{AtomicU32, Ordering}, + time::Duration, +}; + +#[derive(Debug)] +pub struct CounterU32 { + inner: AtomicU32, +} +impl Default for CounterU32 { + fn default() -> Self { + Self { + inner: AtomicU32::new(u32::MAX), + } + } +} +impl CounterU32 { + pub fn open(&self) -> Result<(), &'static str> { + match self + .inner + .compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => Ok(()), + Err(_) => Err("open() called on clsoed state"), + } + } + pub fn close(&self) -> Result { + match self.inner.swap(u32::MAX, Ordering::Relaxed) { + u32::MAX => Err("close() called on closed state"), + x => Ok(x), + } + } + + pub fn add(&self, count: u32) -> Result<(), &'static str> { + if count == 0 { + return Ok(()); + } + let mut had_err = None; + self.inner + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur { + u32::MAX => { + had_err = Some("add() called on closed state"); + None + } + x => { + let (new, overflowed) = x.overflowing_add(count); + if new == u32::MAX || overflowed { + had_err = Some("add() overflowed the counter"); + None + } else { + Some(new) + } + } + }) + .map_err(|_| had_err.expect("we set it whenever the function returns None")) + .map(|_| ()) + } +} + +#[derive(Default, Debug)] +pub struct MicroSecondsCounterU32 { + inner: CounterU32, +} + +impl MicroSecondsCounterU32 { + pub fn open(&self) -> Result<(), &'static str> { + self.inner.open() + } + pub fn add(&self, duration: Duration) -> Result<(), &'static str> { + match duration.as_micros().try_into() { + Ok(x) => self.inner.add(x), + Err(_) => Err("add(): duration conversion error"), + } + } + pub fn close_and_checked_sub_from(&self, from: Duration) -> Result { + let val = self.inner.close()?; + let val = Duration::from_micros(val as u64); + let subbed = match from.checked_sub(val) { + Some(v) => v, + None => return Err("Duration::checked_sub"), + }; + Ok(subbed) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_basic() { + let counter = MicroSecondsCounterU32::default(); + counter.open().unwrap(); + counter.add(Duration::from_micros(23)).unwrap(); + let res = counter + .close_and_checked_sub_from(Duration::from_micros(42)) + .unwrap(); + assert_eq!(res, Duration::from_micros(42 - 23)); + } +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 1749e02c7f92..ce9d4e05d960 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1137,6 +1137,10 @@ impl SmgrQueryTimePerTimeline { start: std::time::Instant::now(), } } + pub(crate) fn observe(&self, op: SmgrQueryType, duration: Duration) { + let metric = &self.metrics[op as usize]; + metric.observe(duration.as_secs_f64()) + } } #[cfg(test)] diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 11eb512750e1..18a004e0614e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -40,6 +40,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; @@ -1143,9 +1144,44 @@ impl PageServerHandler { // load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't set_tracing_field_shard_id(timeline); - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetPageAtLsn); + let start = Instant::now(); + match ctx.micros_spent_throttled.open() { + Ok(()) => (), + Err(error) => { + use std::sync::atomic::{AtomicBool, Ordering}; + static LOGGED: AtomicBool = AtomicBool::new(false); + match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) { + Ok(_) => { + warn!(error, "error opening micros_spent_throttled, this message is only logged once per process lifetime"); + } + Err(_) => {} + } + } + } + scopeguard::defer!({ + let elapsed = start.elapsed(); + let ex_throttled = ctx + .micros_spent_throttled + .close_and_checked_sub_from(elapsed); + let ex_throttled = match ex_throttled { + Ok(res) => res, + Err(error) => { + use std::sync::atomic::{AtomicBool, Ordering}; + static LOGGED: AtomicBool = AtomicBool::new(false); + match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => { + warn!(error, "error deducting time spent throttled, this message is only logged once per process lifetime"); + } + Err(_) => {} + } + elapsed + } + }; + timeline + .query_metrics + .observe(metrics::SmgrQueryType::GetPageAtLsn, ex_throttled); + }); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 6894a88b930d..9265b16eb600 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -9,7 +9,7 @@ use std::{ use arc_swap::ArcSwap; use enumset::EnumSet; -use tracing::error; +use tracing::{error, warn}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -157,6 +157,21 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); + // TODO: really, callers should do this + match ctx.micros_spent_throttled.add(wait_time) { + Ok(res) => res, + Err(error) => { + use std::sync::atomic::{AtomicBool, Ordering}; + static LOGGED: AtomicBool = AtomicBool::new(false); + match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => { + warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime"); + } + Err(_) => {} + } + } + }; } } } From 48578e901d6c90a5139e1052ddfcac45d109ae58 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 11:38:46 +0000 Subject: [PATCH 06/21] WIP impl: generalize it by moving it to the throttling metric --- pageserver/src/metrics.rs | 110 +++++++++++++++++-- pageserver/src/page_service.rs | 50 ++------- pageserver/src/tenant/throttle.rs | 11 +- pageserver/src/tenant/timeline/compaction.rs | 7 +- 4 files changed, 112 insertions(+), 66 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ce9d4e05d960..8fa870329167 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -11,6 +11,7 @@ use once_cell::sync::Lazy; use pageserver_api::shard::TenantShardId; use strum::{EnumCount, IntoEnumIterator, VariantNames}; use strum_macros::{EnumVariantNames, IntoStaticStr}; +use tracing::warn; use utils::id::TimelineId; /// Prometheus histogram buckets (in seconds) for operations in the critical @@ -1005,28 +1006,44 @@ impl GlobalAndPerTimelineHistogram { } } -struct GlobalAndPerTimelineHistogramTimer<'a> { +struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { h: &'a GlobalAndPerTimelineHistogram, + ctx: &'c RequestContext, start: std::time::Instant, + op: SmgrQueryType, } -impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> { +impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { fn drop(&mut self) { let elapsed = self.start.elapsed(); - self.h.observe(elapsed.as_secs_f64()); + let ex_throttled = self + .ctx + .micros_spent_throttled + .close_and_checked_sub_from(elapsed); + let ex_throttled = match ex_throttled { + Ok(res) => res, + Err(error) => { + static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new(); + if LOGGED.test_and_set(self.op) { + warn!(op=?self.op, error, "error deducting time spent throttled, this message is only logged once per process lifetime"); + } + elapsed + } + }; + self.h.observe(ex_throttled.as_secs_f64()); } } #[derive( Debug, - Clone, - Copy, IntoStaticStr, strum_macros::EnumCount, strum_macros::EnumIter, strum_macros::FromRepr, + enumset::EnumSetType, )] #[strum(serialize_all = "snake_case")] +#[enumset(repr = "u8")] pub enum SmgrQueryType { GetRelExists, GetRelSize, @@ -1035,6 +1052,56 @@ pub enum SmgrQueryType { GetSlruSegment, } +struct AtomicSmgrQueryTypeSet { + value: std::sync::atomic::AtomicU8, +} + +impl AtomicSmgrQueryTypeSet { + pub const fn new() -> Self { + AtomicSmgrQueryTypeSet { + value: AtomicU8::new( + // would love to + // ``` + // enumset::EnumSet::::EMPTY.as_repr() + // ``` + // But it's not const, see https://github.com/Lymia/enumset/issues/27 + 0, + ), + } + } + pub fn test_and_set(&self, v: SmgrQueryType) -> bool { + use std::sync::atomic::Ordering; + let res = self + .value + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| { + let cur = enumset::EnumSet::::from_repr(cur); + if cur.contains(v) { + None + } else { + Some((cur | v).as_repr()) + } + }); + res.ok().is_some() + } +} + +#[cfg(test)] +mod test_atomic_smgr_query_type_set { + use super::*; + #[test] + fn test_basic() { + let set = AtomicSmgrQueryTypeSet::new(); + // first set returns true + assert!(set.test_and_set(SmgrQueryType::GetPageAtLsn)); + // try it again, should report false + assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn)); + // set something that's not set before + assert!(set.test_and_set(SmgrQueryType::GetDbSize)); + // flags are independent + assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn)); + } +} + #[derive(Debug)] pub(crate) struct SmgrQueryTimePerTimeline { metrics: [GlobalAndPerTimelineHistogram; SmgrQueryType::COUNT], @@ -1130,17 +1197,29 @@ impl SmgrQueryTimePerTimeline { }); Self { metrics } } - pub(crate) fn start_timer(&self, op: SmgrQueryType) -> impl Drop + '_ { + pub(crate) fn start_timer<'c: 'a, 'a>( + &'a self, + op: SmgrQueryType, + ctx: &'c RequestContext, + ) -> impl Drop + '_ { let metric = &self.metrics[op as usize]; + let start = Instant::now(); + match ctx.micros_spent_throttled.open() { + Ok(()) => (), + Err(error) => { + static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new(); + if LOGGED.test_and_set(op) { + warn!(?op, error, "error opening micros_spent_throttled, this message is only logged once per process lifetime"); + } + } + } GlobalAndPerTimelineHistogramTimer { h: metric, - start: std::time::Instant::now(), + ctx, + start, + op, } } - pub(crate) fn observe(&self, op: SmgrQueryType, duration: Duration) { - let metric = &self.metrics[op as usize]; - metric.observe(duration.as_secs_f64()) - } } #[cfg(test)] @@ -1149,6 +1228,11 @@ mod smgr_query_time_tests { use strum::IntoEnumIterator; use utils::id::{TenantId, TimelineId}; + use crate::{ + context::{DownloadBehavior, RequestContext}, + task_mgr::TaskKind, + }; + // Regression test, we used hard-coded string constants before using an enum. #[test] fn op_label_name() { @@ -1197,7 +1281,8 @@ mod smgr_query_time_tests { let (pre_global, pre_per_tenant_timeline) = get_counts(); assert_eq!(pre_per_tenant_timeline, 0); - let timer = metrics.start_timer(*op); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); + let timer = metrics.start_timer(*op, &ctx); drop(timer); let (post_global, post_per_tenant_timeline) = get_counts(); @@ -1985,6 +2070,7 @@ use futures::Future; use pin_project_lite::pin_project; use std::collections::HashMap; use std::pin::Pin; +use std::sync::atomic::AtomicU8; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 18a004e0614e..95efca58038d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -40,7 +40,6 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; @@ -912,7 +911,7 @@ impl PageServerHandler { let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelExists); + .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -940,7 +939,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelSize); + .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -968,7 +967,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetDbSize); + .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -1144,44 +1143,9 @@ impl PageServerHandler { // load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't set_tracing_field_shard_id(timeline); - let start = Instant::now(); - match ctx.micros_spent_throttled.open() { - Ok(()) => (), - Err(error) => { - use std::sync::atomic::{AtomicBool, Ordering}; - static LOGGED: AtomicBool = AtomicBool::new(false); - match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) { - Ok(_) => { - warn!(error, "error opening micros_spent_throttled, this message is only logged once per process lifetime"); - } - Err(_) => {} - } - } - } - scopeguard::defer!({ - let elapsed = start.elapsed(); - let ex_throttled = ctx - .micros_spent_throttled - .close_and_checked_sub_from(elapsed); - let ex_throttled = match ex_throttled { - Ok(res) => res, - Err(error) => { - use std::sync::atomic::{AtomicBool, Ordering}; - static LOGGED: AtomicBool = AtomicBool::new(false); - match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - { - Ok(_) => { - warn!(error, "error deducting time spent throttled, this message is only logged once per process lifetime"); - } - Err(_) => {} - } - elapsed - } - }; - timeline - .query_metrics - .observe(metrics::SmgrQueryType::GetPageAtLsn, ex_throttled); - }); + let _timer = timeline + .query_metrics + .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = @@ -1209,7 +1173,7 @@ impl PageServerHandler { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetSlruSegment); + .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 9265b16eb600..953b3aa763c8 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -161,14 +161,13 @@ where match ctx.micros_spent_throttled.add(wait_time) { Ok(res) => res, Err(error) => { - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::AtomicBool; static LOGGED: AtomicBool = AtomicBool::new(false); - match LOGGED.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + if LOGGED + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() { - Ok(_) => { - warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime"); - } - Err(_) => {} + warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime"); } } }; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 143ec0583c1b..914e3948efa2 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -81,7 +81,7 @@ impl Timeline { end_lsn, target_file_size, fanout, - &ctx, + ctx, ) .await?; @@ -438,10 +438,7 @@ impl CompactionLayer for ResidentDeltaLayer { impl CompactionDeltaLayer for ResidentDeltaLayer { type DeltaEntry<'a> = DeltaEntry<'a>; - async fn load_keys<'a>( - &self, - ctx: &RequestContext, - ) -> anyhow::Result>> { + async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result>> { self.0.load_keys(ctx).await } } From bccf4594a907e199a236cd9e2d7b77530f4efba6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 12:05:34 +0000 Subject: [PATCH 07/21] WIP impl: switch to rate-limited but repeated logging --- pageserver/src/metrics.rs | 84 +++++++++---------------------- pageserver/src/tenant/throttle.rs | 21 ++++---- 2 files changed, 35 insertions(+), 70 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 8fa870329167..03bbdfce5036 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1023,10 +1023,18 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { let ex_throttled = match ex_throttled { Ok(res) => res, Err(error) => { - static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new(); - if LOGGED.test_and_set(self.op) { + use utils::rate_limit::RateLimit; + static LOGGED: Lazy>> = + Lazy::new(|| { + Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { + RateLimit::new(Duration::from_secs(10)) + }))) + }); + let mut guard = LOGGED.lock().unwrap(); + let rate_limit = &mut guard[self.op]; + rate_limit.call(|| { warn!(op=?self.op, error, "error deducting time spent throttled, this message is only logged once per process lifetime"); - } + }); elapsed } }; @@ -1036,14 +1044,15 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { #[derive( Debug, + Clone, + Copy, IntoStaticStr, strum_macros::EnumCount, strum_macros::EnumIter, strum_macros::FromRepr, - enumset::EnumSetType, + enum_map::Enum, )] #[strum(serialize_all = "snake_case")] -#[enumset(repr = "u8")] pub enum SmgrQueryType { GetRelExists, GetRelSize, @@ -1052,56 +1061,6 @@ pub enum SmgrQueryType { GetSlruSegment, } -struct AtomicSmgrQueryTypeSet { - value: std::sync::atomic::AtomicU8, -} - -impl AtomicSmgrQueryTypeSet { - pub const fn new() -> Self { - AtomicSmgrQueryTypeSet { - value: AtomicU8::new( - // would love to - // ``` - // enumset::EnumSet::::EMPTY.as_repr() - // ``` - // But it's not const, see https://github.com/Lymia/enumset/issues/27 - 0, - ), - } - } - pub fn test_and_set(&self, v: SmgrQueryType) -> bool { - use std::sync::atomic::Ordering; - let res = self - .value - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| { - let cur = enumset::EnumSet::::from_repr(cur); - if cur.contains(v) { - None - } else { - Some((cur | v).as_repr()) - } - }); - res.ok().is_some() - } -} - -#[cfg(test)] -mod test_atomic_smgr_query_type_set { - use super::*; - #[test] - fn test_basic() { - let set = AtomicSmgrQueryTypeSet::new(); - // first set returns true - assert!(set.test_and_set(SmgrQueryType::GetPageAtLsn)); - // try it again, should report false - assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn)); - // set something that's not set before - assert!(set.test_and_set(SmgrQueryType::GetDbSize)); - // flags are independent - assert!(!set.test_and_set(SmgrQueryType::GetPageAtLsn)); - } -} - #[derive(Debug)] pub(crate) struct SmgrQueryTimePerTimeline { metrics: [GlobalAndPerTimelineHistogram; SmgrQueryType::COUNT], @@ -1207,10 +1166,18 @@ impl SmgrQueryTimePerTimeline { match ctx.micros_spent_throttled.open() { Ok(()) => (), Err(error) => { - static LOGGED: AtomicSmgrQueryTypeSet = AtomicSmgrQueryTypeSet::new(); - if LOGGED.test_and_set(op) { + use utils::rate_limit::RateLimit; + static LOGGED: Lazy>> = + Lazy::new(|| { + Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { + RateLimit::new(Duration::from_secs(10)) + }))) + }); + let mut guard = LOGGED.lock().unwrap(); + let rate_limit = &mut guard[op]; + rate_limit.call(|| { warn!(?op, error, "error opening micros_spent_throttled, this message is only logged once per process lifetime"); - } + }); } } GlobalAndPerTimelineHistogramTimer { @@ -2070,7 +2037,6 @@ use futures::Future; use pin_project_lite::pin_project; use std::collections::HashMap; use std::pin::Pin; -use std::sync::atomic::AtomicU8; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 953b3aa763c8..0b39bcd20a01 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -2,7 +2,7 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, Mutex, }, time::{Duration, Instant}, }; @@ -157,20 +157,19 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); - // TODO: really, callers should do this match ctx.micros_spent_throttled.add(wait_time) { Ok(res) => res, Err(error) => { - use std::sync::atomic::AtomicBool; - static LOGGED: AtomicBool = AtomicBool::new(false); - if LOGGED - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - warn!(error, "error adding time spent throttled, this message is only logged once per process lifetime"); - } + use once_cell::sync::Lazy; + use utils::rate_limit::RateLimit; + static WARN_RATE_LIMIT: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); + let mut guard = WARN_RATE_LIMIT.lock().unwrap(); + guard.call(move || { + warn!(error, "error adding time spent throttled; this message is rate-limited globally"); + }); } - }; + } } } } From 9671ac37498138685b3092b0c28647e0f321538b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 13:18:15 +0000 Subject: [PATCH 08/21] make mypy happy --- test_runner/fixtures/neon_fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 836293880575..91045645b7ea 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2344,7 +2344,7 @@ def assert_no_metric_errors(self): assert value == 0, f"Nonzero {metric} == {value}" def log_contains( - self, pattern: str, skip_until: "None | str | Callable[[str], bool]" + self, pattern: str, skip_until: "None | str | Callable[[str], bool]" = None ) -> Optional[str]: """ Check that the pageserver log contains a line that matches the given regex. From 7afbcd1003e4c49e794045f5f3a45480256a7e3a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:40:40 +0000 Subject: [PATCH 09/21] audit log_contains users for truthiness pitfall --- .../regress/test_attach_tenant_config.py | 4 ++- .../regress/test_disk_usage_eviction.py | 17 ++++++----- test_runner/regress/test_duplicate_layers.py | 2 +- test_runner/regress/test_layer_eviction.py | 2 +- .../regress/test_layers_from_future.py | 5 +++- test_runner/regress/test_logging.py | 2 +- .../test_pageserver_getpage_throttle.py | 16 +++++++++-- test_runner/regress/test_remote_storage.py | 19 +++++++++---- test_runner/regress/test_tenant_delete.py | 28 ++++++++++++------- test_runner/regress/test_tenant_detach.py | 12 +++++--- test_runner/regress/test_tenant_relocation.py | 10 +++++-- .../test_tenants_with_remote_storage.py | 12 +++++--- .../regress/test_threshold_based_eviction.py | 4 +-- test_runner/regress/test_timeline_delete.py | 20 ++++++++----- 14 files changed, 104 insertions(+), 49 deletions(-) diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 6cae66384249..f93e2b177a4f 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -64,7 +64,9 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N ) def log_contains_bad_request(): - env.pageserver.log_contains(".*Error processing HTTP request: Bad request") + assert ( + env.pageserver.log_contains(".*Error processing HTTP request: Bad request") is not None + ) wait_until(50, 0.1, log_contains_bad_request) diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index eb4e370ea796..3c409c3d9dea 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -200,7 +200,7 @@ def pageserver_start_with_disk_usage_eviction( tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) def statvfs_called(): - assert pageserver.log_contains(".*running mocked statvfs.*") + assert pageserver.log_contains(".*running mocked statvfs.*") is not None # we most likely have already completed multiple runs wait_until(10, 1, statvfs_called) @@ -482,8 +482,11 @@ def test_pageserver_respects_overridden_resident_size( log.info(f"{response}") time.sleep(1) # give log time to flush - assert not env.neon_env.pageserver.log_contains( - GLOBAL_LRU_LOG_LINE, + assert ( + env.neon_env.pageserver.log_contains( + GLOBAL_LRU_LOG_LINE, + ) + is not None ), "this test is pointless if it fell back to global LRU" (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) @@ -533,7 +536,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E assert actual_change >= target, "eviction must always evict more than target" time.sleep(1) # give log time to flush - assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) + assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) is not None env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) @@ -767,7 +770,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") + assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") is not None env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO") @@ -802,7 +805,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): ) def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") is not None wait_until(10, 1, relieved_log_message) @@ -846,7 +849,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): ) def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") is not None wait_until(10, 1, relieved_log_message) diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py index 224e6f50c725..3bd7bbbe5226 100644 --- a/test_runner/regress/test_duplicate_layers.py +++ b/test_runner/regress/test_duplicate_layers.py @@ -36,7 +36,7 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) time.sleep(10) # let compaction to be performed - assert env.pageserver.log_contains("compact-level0-phase1-return-same") + assert env.pageserver.log_contains("compact-level0-phase1-return-same") is not None def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index efba2033fb92..c274cb8ca86d 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -284,7 +284,7 @@ def ensure_resident_and_remote_size_metrics(): time.sleep(2) # let pitr_interval + 1 second pass ps_http.timeline_gc(tenant_id, timeline_id, 0) time.sleep(1) - assert not env.pageserver.log_contains("Nothing to GC") + assert env.pageserver.log_contains("Nothing to GC") is not None log.info("ensure GC deleted some layers, otherwise this test is pointless") post_gc_info = ps_http.layer_map_info(tenant_id, timeline_id) diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 999e077e454b..82eb886ba72c 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -185,7 +185,10 @@ def future_layer_is_gone_from_index_part(): # NB: the layer file is unlinked index part now, but, because we made the delete # operation stuck, the layer file itself is still in the remote_storage def delete_at_pause_point(): - assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") + assert ( + env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") + is not None + ) wait_until(10, 0.5, delete_at_pause_point) future_layer_path = env.pageserver_remote_storage.remote_layer_path( diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py index d62b5e531c2e..ebc7c2293377 100644 --- a/test_runner/regress/test_logging.py +++ b/test_runner/regress/test_logging.py @@ -34,7 +34,7 @@ def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str): def assert_logged(): if not log_expected: return - assert env.pageserver.log_contains(f".*{msg_id}.*") + assert env.pageserver.log_contains(f".*{msg_id}.*") is not None wait_until(10, 0.5, assert_logged) diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index d9831b06a9d1..e5b0a4f32280 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -79,6 +79,14 @@ def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: i marker = uuid.uuid4().hex ps_http.post_tracing_event("info", marker) + def marker_arrived_in_log(): + res = env.pageserver.log_contains(marker, offset=None) + assert res is not None + _, offset = res + return offset + + marker_offset = wait_until(10, 0.5, marker_arrived_in_log) + log.info("run pagebench") duration_secs = 10 actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs) @@ -97,8 +105,12 @@ def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: i log.info("validate that we logged the throttling") def throttling_log_message_present(): - assert env.pageserver.log_contains( - f".*{tenant_id}.*shard was throttled in the last n_seconds.*", since_pattern=marker + assert ( + env.pageserver.log_contains( + f".*{tenant_id}.*shard was throttled in the last n_seconds.*", + offset=marker_offset, + ) + is not None ) wait_until(10, compaction_period / 10, throttling_log_message_present) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 73ebe0a76fa3..9dd08b6fa1ff 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -120,11 +120,15 @@ def test_remote_storage_backup_and_restore( log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadLayer.*, will retry.*" + assert ( + env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") + is not None ) - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" + assert ( + env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" + ) + is not None ) ##### Stop the first pageserver instance, erase all its data @@ -873,8 +877,11 @@ def until_layer_deletes_completed(): ), "l0 should now be removed because of L0 => L1 compaction and completed uploads" # We should not have hit the error handling path in uploads where a uploaded file is gone - assert not env.pageserver.log_contains( - "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." + assert ( + not env.pageserver.log_contains( + "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." + ) + is not None ) diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 8c7d332e1daf..c763bd3147d7 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -505,10 +505,10 @@ def delete_tenant(): return ps_http.tenant_delete(tenant_id) def hit_remove_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") + assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") is not None def hit_run_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") + assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") is not None with concurrent.futures.ThreadPoolExecutor() as executor: background_200_req = executor.submit(delete_tenant) @@ -612,13 +612,17 @@ def timeline_create(): Thread(target=timeline_create).start() def hit_initdb_upload_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") + assert ( + env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") + is not None + ) wait_until(100, 0.1, hit_initdb_upload_failpoint) def creation_connection_timed_out(): - assert env.pageserver.log_contains( - "POST.*/timeline.* request was dropped before completing" + assert ( + env.pageserver.log_contains("POST.*/timeline.* request was dropped before completing") + is not None ) # Wait so that we hit the timeout and the connection is dropped @@ -636,8 +640,9 @@ def tenant_delete_inner(): Thread(target=tenant_delete).start() def deletion_arrived(): - assert env.pageserver.log_contains( - f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause" + assert ( + env.pageserver.log_contains(f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause") + is not None ) wait_until(100, 0.1, deletion_arrived) @@ -663,9 +668,12 @@ def deletion_arrived(): ) # Ensure that creation cancelled and deletion didn't end up in broken state or encountered the leftover temp file - assert env.pageserver.log_contains(CANCELLED_ERROR) - assert not env.pageserver.log_contains( - ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" + assert env.pageserver.log_contains(CANCELLED_ERROR) is not None + assert ( + not env.pageserver.log_contains( + ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" + ) + is not None ) # Zero tenants remain (we deleted the default tenant) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 4752699abb49..e9e8d38a6ed3 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -92,11 +92,15 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str): wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) # Check that we had to retry the uploads - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadLayer.*, will retry.*" + assert ( + env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") + is not None ) - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" + assert ( + env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" + ) + is not None ) ps_metrics = pageserver_http.get_metrics() diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index b70131472a1c..64bfa0bd9be7 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -495,7 +495,10 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + assert ( + env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + is not None + ) assert time.time() - before_attach_time > 5 # Clean up @@ -632,7 +635,10 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + assert ( + env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + is not None + ) assert time.time() - before_attach_time > 5 # Clean up diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 1c693a0df5ba..b4959bc89fac 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -147,11 +147,15 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder): log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadLayer.*, will retry.*" + assert ( + env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") + is not None ) - assert env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" + assert ( + env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" + ) + is not None ) ##### Stop the pageserver, erase its layer file to force it being downloaded from S3 diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 5f72cfd74713..7bf49a0874df 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -179,6 +179,6 @@ def __repr__(self) -> str: assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - assert env.pageserver.log_contains( - metrics_refused_log_line + assert ( + env.pageserver.log_contains(metrics_refused_log_line) is not None ), "ensure the metrics collection worker ran" diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index a6a6fb47ccd6..0e3753007583 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -531,8 +531,11 @@ def first_call(result_queue): try: def first_call_hit_failpoint(): - assert env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" + assert ( + env.pageserver.log_contains( + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" + ) + is not None ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -602,7 +605,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" def hit_failpoint(): - assert env.pageserver.log_contains(at_failpoint_log_message) + assert env.pageserver.log_contains(at_failpoint_log_message) is not None wait_until(50, 0.1, hit_failpoint) @@ -612,7 +615,7 @@ def hit_failpoint(): env.pageserver.allowed_errors.append(hangup_log_message) def got_hangup_log_message(): - assert env.pageserver.log_contains(hangup_log_message) + assert env.pageserver.log_contains(hangup_log_message) is not None wait_until(50, 0.1, got_hangup_log_message) @@ -624,7 +627,7 @@ def got_hangup_log_message(): def first_request_finished(): message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished" - assert env.pageserver.log_contains(message) + assert env.pageserver.log_contains(message) is not None wait_until(50, 0.1, first_request_finished) @@ -759,8 +762,11 @@ def test_delete_orphaned_objects( for orphan in orphans: assert not orphan.exists() - assert env.pageserver.log_contains( - f"deleting a file not referenced from index_part.json name={orphan.stem}" + assert ( + env.pageserver.log_contains( + f"deleting a file not referenced from index_part.json name={orphan.stem}" + ) + is not None ) assert env.pageserver_remote_storage.index_path(env.initial_tenant, timeline_id).exists() From 956a383e80822908d04df4a57e803e6668613131 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:41:01 +0000 Subject: [PATCH 10/21] implement LogCursor concept for log_contains --- test_runner/fixtures/neon_fixtures.py | 30 ++++++++++++--------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 512aeb4c84ae..f624ceb64813 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2180,6 +2180,11 @@ def __exit__( self.stop(immediate=True) +@dataclass +class LogCursor: + _line_no: int + + class NeonPageserver(PgProtocol): """ An object representing a running pageserver. @@ -2344,8 +2349,8 @@ def assert_no_metric_errors(self): assert value == 0, f"Nonzero {metric} == {value}" def log_contains( - self, pattern: str, since_pattern: "None | str | Callable[[str], bool]" - ) -> Optional[str]: + self, pattern: str, offset: None | LogCursor = None + ) -> Optional[Tuple[str, LogCursor]]: """Check that the pageserver log contains a line that matches the given regex""" logfile = self.workdir / "pageserver.log" if not logfile.exists(): @@ -2354,31 +2359,22 @@ def log_contains( contains_re = re.compile(pattern) - since_pattern_finder: Callable[[str], bool] - if since_pattern is None: - since_pattern_finder = lambda _line: True - elif isinstance(since_pattern, str): - since_pattern_re = re.compile(since_pattern) - since_pattern_finder = lambda line: since_pattern_re.search(line) is not None - else: - since_pattern_finder = since_pattern - # XXX: Our rust logging machinery buffers the messages, so if you # call this function immediately after it's been logged, there is # no guarantee it is already present in the log file. This hasn't # been a problem in practice, our python tests are not fast enough # to hit that race condition. - since_pattern_found = False + skip_until_line_no = 0 if offset is None else offset._line_no + cur_line_no = 0 with logfile.open("r") as f: for line in f: - if not since_pattern_found: - since_pattern_found = since_pattern_finder(line) - if not since_pattern_found: + if cur_line_no < skip_until_line_no: + cur_line_no += 1 continue if contains_re.search(line): # found it! - return line - + cur_line_no += 1 + return (line, LogCursor(cur_line_no)) return None def tenant_attach( From d36edab351898bc4e61a02048d63b4f301f15c2a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:41:15 +0000 Subject: [PATCH 11/21] Revert "audit log_contains users for truthiness pitfall" This reverts commit 7afbcd1003e4c49e794045f5f3a45480256a7e3a. --- .../regress/test_attach_tenant_config.py | 4 +-- .../regress/test_disk_usage_eviction.py | 17 +++++------ test_runner/regress/test_duplicate_layers.py | 2 +- test_runner/regress/test_layer_eviction.py | 2 +- .../regress/test_layers_from_future.py | 5 +--- test_runner/regress/test_logging.py | 2 +- test_runner/regress/test_remote_storage.py | 19 ++++--------- test_runner/regress/test_tenant_delete.py | 28 +++++++------------ test_runner/regress/test_tenant_detach.py | 12 +++----- test_runner/regress/test_tenant_relocation.py | 10 ++----- .../test_tenants_with_remote_storage.py | 12 +++----- .../regress/test_threshold_based_eviction.py | 4 +-- test_runner/regress/test_timeline_delete.py | 20 +++++-------- 13 files changed, 47 insertions(+), 90 deletions(-) diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index f93e2b177a4f..6cae66384249 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -64,9 +64,7 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N ) def log_contains_bad_request(): - assert ( - env.pageserver.log_contains(".*Error processing HTTP request: Bad request") is not None - ) + env.pageserver.log_contains(".*Error processing HTTP request: Bad request") wait_until(50, 0.1, log_contains_bad_request) diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 3c409c3d9dea..eb4e370ea796 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -200,7 +200,7 @@ def pageserver_start_with_disk_usage_eviction( tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) def statvfs_called(): - assert pageserver.log_contains(".*running mocked statvfs.*") is not None + assert pageserver.log_contains(".*running mocked statvfs.*") # we most likely have already completed multiple runs wait_until(10, 1, statvfs_called) @@ -482,11 +482,8 @@ def test_pageserver_respects_overridden_resident_size( log.info(f"{response}") time.sleep(1) # give log time to flush - assert ( - env.neon_env.pageserver.log_contains( - GLOBAL_LRU_LOG_LINE, - ) - is not None + assert not env.neon_env.pageserver.log_contains( + GLOBAL_LRU_LOG_LINE, ), "this test is pointless if it fell back to global LRU" (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) @@ -536,7 +533,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E assert actual_change >= target, "eviction must always evict more than target" time.sleep(1) # give log time to flush - assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) is not None + assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) @@ -770,7 +767,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") is not None + assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO") @@ -805,7 +802,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): ) def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") is not None + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") wait_until(10, 1, relieved_log_message) @@ -849,7 +846,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): ) def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") is not None + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") wait_until(10, 1, relieved_log_message) diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py index 3bd7bbbe5226..224e6f50c725 100644 --- a/test_runner/regress/test_duplicate_layers.py +++ b/test_runner/regress/test_duplicate_layers.py @@ -36,7 +36,7 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) time.sleep(10) # let compaction to be performed - assert env.pageserver.log_contains("compact-level0-phase1-return-same") is not None + assert env.pageserver.log_contains("compact-level0-phase1-return-same") def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index c274cb8ca86d..efba2033fb92 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -284,7 +284,7 @@ def ensure_resident_and_remote_size_metrics(): time.sleep(2) # let pitr_interval + 1 second pass ps_http.timeline_gc(tenant_id, timeline_id, 0) time.sleep(1) - assert env.pageserver.log_contains("Nothing to GC") is not None + assert not env.pageserver.log_contains("Nothing to GC") log.info("ensure GC deleted some layers, otherwise this test is pointless") post_gc_info = ps_http.layer_map_info(tenant_id, timeline_id) diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 82eb886ba72c..999e077e454b 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -185,10 +185,7 @@ def future_layer_is_gone_from_index_part(): # NB: the layer file is unlinked index part now, but, because we made the delete # operation stuck, the layer file itself is still in the remote_storage def delete_at_pause_point(): - assert ( - env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") - is not None - ) + assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") wait_until(10, 0.5, delete_at_pause_point) future_layer_path = env.pageserver_remote_storage.remote_layer_path( diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py index ebc7c2293377..d62b5e531c2e 100644 --- a/test_runner/regress/test_logging.py +++ b/test_runner/regress/test_logging.py @@ -34,7 +34,7 @@ def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str): def assert_logged(): if not log_expected: return - assert env.pageserver.log_contains(f".*{msg_id}.*") is not None + assert env.pageserver.log_contains(f".*{msg_id}.*") wait_until(10, 0.5, assert_logged) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 9dd08b6fa1ff..73ebe0a76fa3 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -120,15 +120,11 @@ def test_remote_storage_backup_and_restore( log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert ( - env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert ( - env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" - ) - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" ) ##### Stop the first pageserver instance, erase all its data @@ -877,11 +873,8 @@ def until_layer_deletes_completed(): ), "l0 should now be removed because of L0 => L1 compaction and completed uploads" # We should not have hit the error handling path in uploads where a uploaded file is gone - assert ( - not env.pageserver.log_contains( - "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." - ) - is not None + assert not env.pageserver.log_contains( + "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." ) diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index c763bd3147d7..8c7d332e1daf 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -505,10 +505,10 @@ def delete_tenant(): return ps_http.tenant_delete(tenant_id) def hit_remove_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") is not None + assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") def hit_run_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") is not None + assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") with concurrent.futures.ThreadPoolExecutor() as executor: background_200_req = executor.submit(delete_tenant) @@ -612,17 +612,13 @@ def timeline_create(): Thread(target=timeline_create).start() def hit_initdb_upload_failpoint(): - assert ( - env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") - is not None - ) + assert env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") wait_until(100, 0.1, hit_initdb_upload_failpoint) def creation_connection_timed_out(): - assert ( - env.pageserver.log_contains("POST.*/timeline.* request was dropped before completing") - is not None + assert env.pageserver.log_contains( + "POST.*/timeline.* request was dropped before completing" ) # Wait so that we hit the timeout and the connection is dropped @@ -640,9 +636,8 @@ def tenant_delete_inner(): Thread(target=tenant_delete).start() def deletion_arrived(): - assert ( - env.pageserver.log_contains(f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause") - is not None + assert env.pageserver.log_contains( + f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause" ) wait_until(100, 0.1, deletion_arrived) @@ -668,12 +663,9 @@ def deletion_arrived(): ) # Ensure that creation cancelled and deletion didn't end up in broken state or encountered the leftover temp file - assert env.pageserver.log_contains(CANCELLED_ERROR) is not None - assert ( - not env.pageserver.log_contains( - ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" - ) - is not None + assert env.pageserver.log_contains(CANCELLED_ERROR) + assert not env.pageserver.log_contains( + ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" ) # Zero tenants remain (we deleted the default tenant) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index e9e8d38a6ed3..4752699abb49 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -92,15 +92,11 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str): wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) # Check that we had to retry the uploads - assert ( - env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert ( - env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" - ) - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" ) ps_metrics = pageserver_http.get_metrics() diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 64bfa0bd9be7..b70131472a1c 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -495,10 +495,7 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - assert ( - env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') - is not None - ) + assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up @@ -635,10 +632,7 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - assert ( - env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') - is not None - ) + assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index b4959bc89fac..1c693a0df5ba 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -147,15 +147,11 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder): log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert ( - env.pageserver.log_contains(".*failed to perform remote task UploadLayer.*, will retry.*") - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert ( - env.pageserver.log_contains( - ".*failed to perform remote task UploadMetadata.*, will retry.*" - ) - is not None + assert env.pageserver.log_contains( + ".*failed to perform remote task UploadMetadata.*, will retry.*" ) ##### Stop the pageserver, erase its layer file to force it being downloaded from S3 diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 7bf49a0874df..5f72cfd74713 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -179,6 +179,6 @@ def __repr__(self) -> str: assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - assert ( - env.pageserver.log_contains(metrics_refused_log_line) is not None + assert env.pageserver.log_contains( + metrics_refused_log_line ), "ensure the metrics collection worker ran" diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 0e3753007583..a6a6fb47ccd6 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -531,11 +531,8 @@ def first_call(result_queue): try: def first_call_hit_failpoint(): - assert ( - env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" - ) - is not None + assert env.pageserver.log_contains( + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -605,7 +602,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" def hit_failpoint(): - assert env.pageserver.log_contains(at_failpoint_log_message) is not None + assert env.pageserver.log_contains(at_failpoint_log_message) wait_until(50, 0.1, hit_failpoint) @@ -615,7 +612,7 @@ def hit_failpoint(): env.pageserver.allowed_errors.append(hangup_log_message) def got_hangup_log_message(): - assert env.pageserver.log_contains(hangup_log_message) is not None + assert env.pageserver.log_contains(hangup_log_message) wait_until(50, 0.1, got_hangup_log_message) @@ -627,7 +624,7 @@ def got_hangup_log_message(): def first_request_finished(): message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished" - assert env.pageserver.log_contains(message) is not None + assert env.pageserver.log_contains(message) wait_until(50, 0.1, first_request_finished) @@ -762,11 +759,8 @@ def test_delete_orphaned_objects( for orphan in orphans: assert not orphan.exists() - assert ( - env.pageserver.log_contains( - f"deleting a file not referenced from index_part.json name={orphan.stem}" - ) - is not None + assert env.pageserver.log_contains( + f"deleting a file not referenced from index_part.json name={orphan.stem}" ) assert env.pageserver_remote_storage.index_path(env.initial_tenant, timeline_id).exists() From 3227e33a1703b7024ad5ab424c1f7b5fa8e4269b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:43:30 +0000 Subject: [PATCH 12/21] add assert_log_contains --- test_runner/fixtures/neon_fixtures.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f624ceb64813..ebcf244f8e8c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2348,6 +2348,11 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" + def assert_log_contains(self, pattern: str, offset: None | LogCursor = None) -> Tuple[str, LogCursor]: + res = self.log_contains(pattern, offset=offset) + assert res is not None + return res + def log_contains( self, pattern: str, offset: None | LogCursor = None ) -> Optional[Tuple[str, LogCursor]]: From 505946d8cd1340446e226852c30d359b4964b331 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 17:17:56 +0000 Subject: [PATCH 13/21] wait_until usage corrections & fix potential truthiness issues in some callers of log_contains callers --- test_runner/fixtures/neon_fixtures.py | 4 +- test_runner/fixtures/pageserver/utils.py | 6 +-- test_runner/fixtures/utils.py | 19 ++++++- .../regress/test_attach_tenant_config.py | 9 ++-- .../regress/test_disk_usage_eviction.py | 20 ++++--- test_runner/regress/test_duplicate_layers.py | 2 +- .../regress/test_layers_from_future.py | 11 ++-- test_runner/regress/test_logging.py | 2 +- .../regress/test_pageserver_generations.py | 2 +- .../test_pageserver_getpage_throttle.py | 29 ++++------- test_runner/regress/test_remote_storage.py | 52 ++++++++++--------- test_runner/regress/test_sharding_service.py | 4 +- test_runner/regress/test_tenant_delete.py | 12 ++--- test_runner/regress/test_tenant_detach.py | 4 +- test_runner/regress/test_tenant_relocation.py | 4 +- .../test_tenants_with_remote_storage.py | 4 +- .../regress/test_threshold_based_eviction.py | 7 +-- test_runner/regress/test_timeline_delete.py | 11 ++-- test_runner/regress/test_timeline_size.py | 5 +- 19 files changed, 112 insertions(+), 95 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ebcf244f8e8c..8837e0368f52 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2348,7 +2348,9 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" - def assert_log_contains(self, pattern: str, offset: None | LogCursor = None) -> Tuple[str, LogCursor]: + def assert_log_contains( + self, pattern: str, offset: None | LogCursor = None + ) -> Tuple[str, LogCursor]: res = self.log_contains(pattern, offset=offset) assert res is not None return res diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 1415038f699d..c600733e414b 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -20,7 +20,7 @@ def assert_tenant_state( tenant: TenantId, expected_state: str, message: Optional[str] = None, -): +) -> None: tenant_status = pageserver_http.tenant_status(tenant) log.info(f"tenant_status: {tenant_status}") assert tenant_status["state"]["slug"] == expected_state, message or tenant_status @@ -292,7 +292,7 @@ def timeline_delete_wait_completed( iterations: int = 20, interval: Optional[float] = None, **delete_args, -): +) -> None: pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval) @@ -302,7 +302,7 @@ def assert_prefix_empty( remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None, allowed_postfix: Optional[str] = None, -): +) -> None: assert remote_storage is not None response = list_prefix(remote_storage, prefix) keys = response["KeyCount"] diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 91f33e1196ac..7fc3bae3afd6 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -369,7 +369,12 @@ def start_in_background( return spawned_process -def wait_until(number_of_iterations: int, interval: float, func: Fn): +WaitUntilRet = TypeVar("WaitUntilRet") + + +def wait_until( + number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet] +) -> WaitUntilRet: """ Wait until 'func' returns successfully, without exception. Returns the last return value from the function. @@ -387,6 +392,18 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn): raise Exception("timed out while waiting for %s" % func) from last_exception +def assert_eq(a, b) -> None: + assert a == b + + +def assert_gt(a, b) -> None: + assert a > b + + +def assert_ge(a, b) -> None: + assert a >= b + + def run_pg_bench_small(pg_bin: "PgBin", connstr: str): """ Fast way to populate data. diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 6cae66384249..7fbce6a10c38 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -63,10 +63,11 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N ] ) - def log_contains_bad_request(): - env.pageserver.log_contains(".*Error processing HTTP request: Bad request") - - wait_until(50, 0.1, log_contains_bad_request) + wait_until( + 50, + 0.1, + lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"), + ) def test_null_body(negative_env: NegativeTests): diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index eb4e370ea796..b83545216d8a 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -200,7 +200,7 @@ def pageserver_start_with_disk_usage_eviction( tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) def statvfs_called(): - assert pageserver.log_contains(".*running mocked statvfs.*") + pageserver.assert_log_contains(".*running mocked statvfs.*") # we most likely have already completed multiple runs wait_until(10, 1, statvfs_called) @@ -533,7 +533,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E assert actual_change >= target, "eviction must always evict more than target" time.sleep(1) # give log time to flush - assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) + env.neon_env.pageserver.assert_log_contains(GLOBAL_LRU_LOG_LINE) env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) @@ -767,7 +767,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") + env.neon_env.pageserver.assert_log_contains(".*statvfs failed.*EIO") env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO") @@ -801,10 +801,9 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") - - wait_until(10, 1, relieved_log_message) + wait_until( + 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + ) def less_than_max_usage_pct(): post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) @@ -845,10 +844,9 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") - - wait_until(10, 1, relieved_log_message) + wait_until( + 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + ) def more_than_min_avail_bytes_freed(): post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py index 224e6f50c725..cb4fa43be724 100644 --- a/test_runner/regress/test_duplicate_layers.py +++ b/test_runner/regress/test_duplicate_layers.py @@ -36,7 +36,7 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) time.sleep(10) # let compaction to be performed - assert env.pageserver.log_contains("compact-level0-phase1-return-same") + env.pageserver.assert_log_contains("compact-level0-phase1-return-same") def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 999e077e454b..9da47b9fd33d 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -184,10 +184,13 @@ def future_layer_is_gone_from_index_part(): # NB: the layer file is unlinked index part now, but, because we made the delete # operation stuck, the layer file itself is still in the remote_storage - def delete_at_pause_point(): - assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") - - wait_until(10, 0.5, delete_at_pause_point) + wait_until( + 10, + 0.5, + lambda: env.pageserver.assert_log_contains( + f".*{tenant_id}.*at failpoint.*{failpoint_name}" + ), + ) future_layer_path = env.pageserver_remote_storage.remote_layer_path( tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach ) diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py index d62b5e531c2e..bfffad75722f 100644 --- a/test_runner/regress/test_logging.py +++ b/test_runner/regress/test_logging.py @@ -34,7 +34,7 @@ def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str): def assert_logged(): if not log_expected: return - assert env.pageserver.log_contains(f".*{msg_id}.*") + env.pageserver.assert_log_contains(f".*{msg_id}.*") wait_until(10, 0.5, assert_logged) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 1070d06ed040..89fc48a49f82 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -432,7 +432,7 @@ def assert_header_written(): main_pageserver.start() - def assert_deletions_submitted(n: int): + def assert_deletions_submitted(n: int) -> None: assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n # After restart, issue a flush to kick the deletion frontend to do recovery. diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index e5b0a4f32280..42cc28efee2d 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -78,14 +78,9 @@ def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: i marker = uuid.uuid4().hex ps_http.post_tracing_event("info", marker) - - def marker_arrived_in_log(): - res = env.pageserver.log_contains(marker, offset=None) - assert res is not None - _, offset = res - return offset - - marker_offset = wait_until(10, 0.5, marker_arrived_in_log) + _, marker_offset = wait_until( + 10, 0.5, lambda: env.pageserver.assert_log_contains(marker, offset=None) + ) log.info("run pagebench") duration_secs = 10 @@ -104,16 +99,14 @@ def marker_arrived_in_log(): log.info("validate that we logged the throttling") - def throttling_log_message_present(): - assert ( - env.pageserver.log_contains( - f".*{tenant_id}.*shard was throttled in the last n_seconds.*", - offset=marker_offset, - ) - is not None - ) - - wait_until(10, compaction_period / 10, throttling_log_message_present) + wait_until( + 10, + compaction_period / 10, + lambda: env.pageserver.assert_log_contains( + f".*{tenant_id}.*shard was throttled in the last n_seconds.*", + offset=marker_offset, + ), + ) log.info("validate that the metric doesn't include throttle wait time") smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 73ebe0a76fa3..d322b4ffff73 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -28,7 +28,14 @@ available_remote_storages, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import print_gc_result, query_scalar, wait_until +from fixtures.utils import ( + assert_eq, + assert_ge, + assert_gt, + print_gc_result, + query_scalar, + wait_until, +) from requests import ReadTimeout @@ -120,10 +127,10 @@ def test_remote_storage_backup_and_restore( log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) @@ -292,9 +299,9 @@ def get_queued_count(file_kind, op_kind): print_gc_result(gc_result) assert gc_result["layers_removed"] > 0 - wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) - wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # let all future operations queue up configure_storage_sync_failpoints("return") @@ -322,17 +329,17 @@ def churn_while_failpoints_active(result): churn_while_failpoints_active_thread.start() # wait for churn thread's data to get stuck in the upload queue - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0) + wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(10, 0.1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) + wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # unblock churn operations configure_storage_sync_failpoints("off") # ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts. - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # The churn thread doesn't make progress once it blocks on the first wait_completion() call, # so, give it some time to wrap up. @@ -884,26 +891,23 @@ def wait_upload_queue_empty( wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="layer", op_kind="upload" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="upload"), 0 + ), ) wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="index", op_kind="upload" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload"), 0 + ), ) wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="layer", op_kind="delete" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="delete"), 0 + ), ) diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 6ed49d7fd6e9..c8224c1c67ff 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -116,7 +116,7 @@ def test_sharding_service_smoke( # Marking a pageserver offline should migrate tenants away from it. env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) - def node_evacuated(node_id: int): + def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, tenant_ids) assert counts[node_id] == 0 @@ -405,7 +405,7 @@ def handler(request: Request): env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) - def node_evacuated(node_id: int): + def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, [env.initial_tenant]) assert counts[node_id] == 0 diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 8c7d332e1daf..c4b4e5fb7781 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -505,10 +505,10 @@ def delete_tenant(): return ps_http.tenant_delete(tenant_id) def hit_remove_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") def hit_run_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") with concurrent.futures.ThreadPoolExecutor() as executor: background_200_req = executor.submit(delete_tenant) @@ -612,12 +612,12 @@ def timeline_create(): Thread(target=timeline_create).start() def hit_initdb_upload_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") wait_until(100, 0.1, hit_initdb_upload_failpoint) def creation_connection_timed_out(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( "POST.*/timeline.* request was dropped before completing" ) @@ -636,7 +636,7 @@ def tenant_delete_inner(): Thread(target=tenant_delete).start() def deletion_arrived(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause" ) @@ -663,7 +663,7 @@ def deletion_arrived(): ) # Ensure that creation cancelled and deletion didn't end up in broken state or encountered the leftover temp file - assert env.pageserver.log_contains(CANCELLED_ERROR) + env.pageserver.assert_log_contains(CANCELLED_ERROR) assert not env.pageserver.log_contains( ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" ) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 4752699abb49..d3f24cb06e07 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -92,10 +92,10 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str): wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index b70131472a1c..9def3ad1c243 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -495,7 +495,7 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up @@ -632,7 +632,7 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 1c693a0df5ba..d16978d02ac0 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -147,10 +147,10 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder): log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 5f72cfd74713..f88c1e890b7e 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -179,6 +179,7 @@ def __repr__(self) -> str: assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - assert env.pageserver.log_contains( - metrics_refused_log_line - ), "ensure the metrics collection worker ran" + ( + env.pageserver.assert_log_contains(metrics_refused_log_line), + "ensure the metrics collection worker ran", + ) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index a6a6fb47ccd6..795110d90be5 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -89,6 +89,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): assert timeline_path.exists() # retry deletes when compaction or gc is running in pageserver + # TODO: review whether this wait_until is actually necessary, we do an await() internally wait_until( number_of_iterations=3, interval=0.2, @@ -531,7 +532,7 @@ def first_call(result_queue): try: def first_call_hit_failpoint(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) @@ -602,7 +603,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" def hit_failpoint(): - assert env.pageserver.log_contains(at_failpoint_log_message) + env.pageserver.assert_log_contains(at_failpoint_log_message) wait_until(50, 0.1, hit_failpoint) @@ -612,7 +613,7 @@ def hit_failpoint(): env.pageserver.allowed_errors.append(hangup_log_message) def got_hangup_log_message(): - assert env.pageserver.log_contains(hangup_log_message) + env.pageserver.assert_log_contains(hangup_log_message) wait_until(50, 0.1, got_hangup_log_message) @@ -624,7 +625,7 @@ def got_hangup_log_message(): def first_request_finished(): message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished" - assert env.pageserver.log_contains(message) + env.pageserver.assert_log_contains(message) wait_until(50, 0.1, first_request_finished) @@ -759,7 +760,7 @@ def test_delete_orphaned_objects( for orphan in orphans: assert not orphan.exists() - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f"deleting a file not referenced from index_part.json name={orphan.stem}" ) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 327e5abe26e7..fc3b7c4b6070 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -852,12 +852,9 @@ def delete_tenant(): # on-demand activation was triggered by the tenant deletion log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000 gen=[0-9a-f]+}}: Activating tenant \\(on-demand\\).*" - def activated_on_demand(): - assert env.pageserver.log_contains(log_match) is not None - log.info(f"Waiting for activation message '{log_match}'") try: - wait_until(10, 1, activated_on_demand) + wait_until(10, 1, lambda: env.pageserver.assert_log_contains(log_match)) finally: log.info("Clearing failpoint") pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off")) From 0f3db7bb592fbc5d9f2b4dd57f26d68fdad2ce79 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:41:01 +0000 Subject: [PATCH 14/21] implement LogCursor concept for log_contains --- test_runner/fixtures/neon_fixtures.py | 43 +++++++++------------------ 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 91045645b7ea..f624ceb64813 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2180,6 +2180,11 @@ def __exit__( self.stop(immediate=True) +@dataclass +class LogCursor: + _line_no: int + + class NeonPageserver(PgProtocol): """ An object representing a running pageserver. @@ -2344,12 +2349,9 @@ def assert_no_metric_errors(self): assert value == 0, f"Nonzero {metric} == {value}" def log_contains( - self, pattern: str, skip_until: "None | str | Callable[[str], bool]" = None - ) -> Optional[str]: - """ - Check that the pageserver log contains a line that matches the given regex. - Use `skip_until` to limit the search to the suffix of the log that follows the first line that matches `skip_until`. - """ + self, pattern: str, offset: None | LogCursor = None + ) -> Optional[Tuple[str, LogCursor]]: + """Check that the pageserver log contains a line that matches the given regex""" logfile = self.workdir / "pageserver.log" if not logfile.exists(): log.warning(f"Skipping log check: {logfile} does not exist") @@ -2357,39 +2359,22 @@ def log_contains( contains_re = re.compile(pattern) - skip_until_finder: Callable[[str], bool] - if skip_until is None: - - def always_true_finder(_line: str) -> bool: - return True - - skip_until_finder = always_true_finder - elif isinstance(skip_until, str): - skip_until_pattern_re = re.compile(skip_until) - - def re_finder(_line: str) -> bool: - return skip_until_pattern_re.search(line) is not None - - skip_until_finder = re_finder - else: - skip_until_finder = skip_until - # XXX: Our rust logging machinery buffers the messages, so if you # call this function immediately after it's been logged, there is # no guarantee it is already present in the log file. This hasn't # been a problem in practice, our python tests are not fast enough # to hit that race condition. - skip_until_pattern_found = False + skip_until_line_no = 0 if offset is None else offset._line_no + cur_line_no = 0 with logfile.open("r") as f: for line in f: - if not skip_until_pattern_found: - skip_until_pattern_found = skip_until_finder(line) - if not skip_until_pattern_found: + if cur_line_no < skip_until_line_no: + cur_line_no += 1 continue if contains_re.search(line): # found it! - return line - + cur_line_no += 1 + return (line, LogCursor(cur_line_no)) return None def tenant_attach( From 7b4f12e41a9efe5d202a81c8c5e3029184f44fb5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 16:43:30 +0000 Subject: [PATCH 15/21] add assert_log_contains --- test_runner/fixtures/neon_fixtures.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f624ceb64813..ebcf244f8e8c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2348,6 +2348,11 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" + def assert_log_contains(self, pattern: str, offset: None | LogCursor = None) -> Tuple[str, LogCursor]: + res = self.log_contains(pattern, offset=offset) + assert res is not None + return res + def log_contains( self, pattern: str, offset: None | LogCursor = None ) -> Optional[Tuple[str, LogCursor]]: From 1bf12fd367c6dd6ac19dfbf2c632e6d06e2af8c1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 17:17:56 +0000 Subject: [PATCH 16/21] wait_until usage corrections & fix potential truthiness issues in some callers of log_contains callers --- test_runner/fixtures/neon_fixtures.py | 4 +- test_runner/fixtures/pageserver/utils.py | 6 +-- test_runner/fixtures/utils.py | 19 ++++++- .../regress/test_attach_tenant_config.py | 9 ++-- .../regress/test_disk_usage_eviction.py | 20 ++++--- test_runner/regress/test_duplicate_layers.py | 2 +- .../regress/test_layers_from_future.py | 11 ++-- test_runner/regress/test_logging.py | 2 +- .../regress/test_pageserver_generations.py | 2 +- test_runner/regress/test_remote_storage.py | 52 ++++++++++--------- test_runner/regress/test_sharding_service.py | 4 +- test_runner/regress/test_tenant_delete.py | 12 ++--- test_runner/regress/test_tenant_detach.py | 4 +- test_runner/regress/test_tenant_relocation.py | 4 +- .../test_tenants_with_remote_storage.py | 4 +- .../regress/test_threshold_based_eviction.py | 7 +-- test_runner/regress/test_timeline_delete.py | 11 ++-- 17 files changed, 100 insertions(+), 73 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index ebcf244f8e8c..8837e0368f52 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2348,7 +2348,9 @@ def assert_no_metric_errors(self): value = self.http_client().get_metric_value(metric) assert value == 0, f"Nonzero {metric} == {value}" - def assert_log_contains(self, pattern: str, offset: None | LogCursor = None) -> Tuple[str, LogCursor]: + def assert_log_contains( + self, pattern: str, offset: None | LogCursor = None + ) -> Tuple[str, LogCursor]: res = self.log_contains(pattern, offset=offset) assert res is not None return res diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 1415038f699d..c600733e414b 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -20,7 +20,7 @@ def assert_tenant_state( tenant: TenantId, expected_state: str, message: Optional[str] = None, -): +) -> None: tenant_status = pageserver_http.tenant_status(tenant) log.info(f"tenant_status: {tenant_status}") assert tenant_status["state"]["slug"] == expected_state, message or tenant_status @@ -292,7 +292,7 @@ def timeline_delete_wait_completed( iterations: int = 20, interval: Optional[float] = None, **delete_args, -): +) -> None: pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations, interval) @@ -302,7 +302,7 @@ def assert_prefix_empty( remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None, allowed_postfix: Optional[str] = None, -): +) -> None: assert remote_storage is not None response = list_prefix(remote_storage, prefix) keys = response["KeyCount"] diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 91f33e1196ac..7fc3bae3afd6 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -369,7 +369,12 @@ def start_in_background( return spawned_process -def wait_until(number_of_iterations: int, interval: float, func: Fn): +WaitUntilRet = TypeVar("WaitUntilRet") + + +def wait_until( + number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet] +) -> WaitUntilRet: """ Wait until 'func' returns successfully, without exception. Returns the last return value from the function. @@ -387,6 +392,18 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn): raise Exception("timed out while waiting for %s" % func) from last_exception +def assert_eq(a, b) -> None: + assert a == b + + +def assert_gt(a, b) -> None: + assert a > b + + +def assert_ge(a, b) -> None: + assert a >= b + + def run_pg_bench_small(pg_bin: "PgBin", connstr: str): """ Fast way to populate data. diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 6cae66384249..7fbce6a10c38 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -63,10 +63,11 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N ] ) - def log_contains_bad_request(): - env.pageserver.log_contains(".*Error processing HTTP request: Bad request") - - wait_until(50, 0.1, log_contains_bad_request) + wait_until( + 50, + 0.1, + lambda: env.pageserver.assert_log_contains(".*Error processing HTTP request: Bad request"), + ) def test_null_body(negative_env: NegativeTests): diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index eb4e370ea796..b83545216d8a 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -200,7 +200,7 @@ def pageserver_start_with_disk_usage_eviction( tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id) def statvfs_called(): - assert pageserver.log_contains(".*running mocked statvfs.*") + pageserver.assert_log_contains(".*running mocked statvfs.*") # we most likely have already completed multiple runs wait_until(10, 1, statvfs_called) @@ -533,7 +533,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E assert actual_change >= target, "eviction must always evict more than target" time.sleep(1) # give log time to flush - assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) + env.neon_env.pageserver.assert_log_contains(GLOBAL_LRU_LOG_LINE) env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) @@ -767,7 +767,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") + env.neon_env.pageserver.assert_log_contains(".*statvfs failed.*EIO") env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO") @@ -801,10 +801,9 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") - - wait_until(10, 1, relieved_log_message) + wait_until( + 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + ) def less_than_max_usage_pct(): post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) @@ -845,10 +844,9 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): eviction_order=EvictionOrder.ABSOLUTE_ORDER, ) - def relieved_log_message(): - assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") - - wait_until(10, 1, relieved_log_message) + wait_until( + 10, 1, lambda: env.neon_env.pageserver.assert_log_contains(".*disk usage pressure relieved") + ) def more_than_min_avail_bytes_freed(): post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) diff --git a/test_runner/regress/test_duplicate_layers.py b/test_runner/regress/test_duplicate_layers.py index 224e6f50c725..cb4fa43be724 100644 --- a/test_runner/regress/test_duplicate_layers.py +++ b/test_runner/regress/test_duplicate_layers.py @@ -36,7 +36,7 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): pg_bin.run_capture(["pgbench", "-i", "-s1", connstr]) time.sleep(10) # let compaction to be performed - assert env.pageserver.log_contains("compact-level0-phase1-return-same") + env.pageserver.assert_log_contains("compact-level0-phase1-return-same") def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 999e077e454b..9da47b9fd33d 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -184,10 +184,13 @@ def future_layer_is_gone_from_index_part(): # NB: the layer file is unlinked index part now, but, because we made the delete # operation stuck, the layer file itself is still in the remote_storage - def delete_at_pause_point(): - assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") - - wait_until(10, 0.5, delete_at_pause_point) + wait_until( + 10, + 0.5, + lambda: env.pageserver.assert_log_contains( + f".*{tenant_id}.*at failpoint.*{failpoint_name}" + ), + ) future_layer_path = env.pageserver_remote_storage.remote_layer_path( tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach ) diff --git a/test_runner/regress/test_logging.py b/test_runner/regress/test_logging.py index d62b5e531c2e..bfffad75722f 100644 --- a/test_runner/regress/test_logging.py +++ b/test_runner/regress/test_logging.py @@ -34,7 +34,7 @@ def test_logging_event_count(neon_env_builder: NeonEnvBuilder, level: str): def assert_logged(): if not log_expected: return - assert env.pageserver.log_contains(f".*{msg_id}.*") + env.pageserver.assert_log_contains(f".*{msg_id}.*") wait_until(10, 0.5, assert_logged) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 1070d06ed040..89fc48a49f82 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -432,7 +432,7 @@ def assert_header_written(): main_pageserver.start() - def assert_deletions_submitted(n: int): + def assert_deletions_submitted(n: int) -> None: assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n # After restart, issue a flush to kick the deletion frontend to do recovery. diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 73ebe0a76fa3..d322b4ffff73 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -28,7 +28,14 @@ available_remote_storages, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import print_gc_result, query_scalar, wait_until +from fixtures.utils import ( + assert_eq, + assert_ge, + assert_gt, + print_gc_result, + query_scalar, + wait_until, +) from requests import ReadTimeout @@ -120,10 +127,10 @@ def test_remote_storage_backup_and_restore( log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) @@ -292,9 +299,9 @@ def get_queued_count(file_kind, op_kind): print_gc_result(gc_result) assert gc_result["layers_removed"] > 0 - wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) - wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) + wait_until(2, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # let all future operations queue up configure_storage_sync_failpoints("return") @@ -322,17 +329,17 @@ def churn_while_failpoints_active(result): churn_while_failpoints_active_thread.start() # wait for churn thread's data to get stuck in the upload queue - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2) - wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0) + wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(10, 0.1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) + wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # unblock churn operations configure_storage_sync_failpoints("off") # ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts. - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0) - wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="index", op_kind="upload"), 0)) + wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # The churn thread doesn't make progress once it blocks on the first wait_completion() call, # so, give it some time to wrap up. @@ -884,26 +891,23 @@ def wait_upload_queue_empty( wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="layer", op_kind="upload" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="upload"), 0 + ), ) wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="index", op_kind="upload" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="index", op_kind="upload"), 0 + ), ) wait_until( 2, 1, - lambda: get_queued_count( - client, tenant_id, timeline_id, file_kind="layer", op_kind="delete" - ) - == 0, + lambda: assert_eq( + get_queued_count(client, tenant_id, timeline_id, file_kind="layer", op_kind="delete"), 0 + ), ) diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 6ed49d7fd6e9..c8224c1c67ff 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -116,7 +116,7 @@ def test_sharding_service_smoke( # Marking a pageserver offline should migrate tenants away from it. env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) - def node_evacuated(node_id: int): + def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, tenant_ids) assert counts[node_id] == 0 @@ -405,7 +405,7 @@ def handler(request: Request): env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) - def node_evacuated(node_id: int): + def node_evacuated(node_id: int) -> None: counts = get_node_shard_counts(env, [env.initial_tenant]) assert counts[node_id] == 0 diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 8c7d332e1daf..c4b4e5fb7781 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -505,10 +505,10 @@ def delete_tenant(): return ps_http.tenant_delete(tenant_id) def hit_remove_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_REMOVE_FAILPOINT}") def hit_run_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_RUN_FAILPOINT}") with concurrent.futures.ThreadPoolExecutor() as executor: background_200_req = executor.submit(delete_tenant) @@ -612,12 +612,12 @@ def timeline_create(): Thread(target=timeline_create).start() def hit_initdb_upload_failpoint(): - assert env.pageserver.log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") + env.pageserver.assert_log_contains(f"at failpoint {BEFORE_INITDB_UPLOAD_FAILPOINT}") wait_until(100, 0.1, hit_initdb_upload_failpoint) def creation_connection_timed_out(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( "POST.*/timeline.* request was dropped before completing" ) @@ -636,7 +636,7 @@ def tenant_delete_inner(): Thread(target=tenant_delete).start() def deletion_arrived(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f"cfg failpoint: {DELETE_BEFORE_CLEANUP_FAILPOINT} pause" ) @@ -663,7 +663,7 @@ def deletion_arrived(): ) # Ensure that creation cancelled and deletion didn't end up in broken state or encountered the leftover temp file - assert env.pageserver.log_contains(CANCELLED_ERROR) + env.pageserver.assert_log_contains(CANCELLED_ERROR) assert not env.pageserver.log_contains( ".*ERROR.*delete_tenant.*Timelines directory is not empty after all timelines deletion" ) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 4752699abb49..d3f24cb06e07 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -92,10 +92,10 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str): wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index b70131472a1c..9def3ad1c243 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -495,7 +495,7 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up @@ -632,7 +632,7 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - assert env.pageserver.log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') assert time.time() - before_attach_time > 5 # Clean up diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 1c693a0df5ba..d16978d02ac0 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -147,10 +147,10 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder): log.info(f"upload of checkpoint {checkpoint_number} is done") # Check that we had to retry the uploads - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadLayer.*, will retry.*" ) - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( ".*failed to perform remote task UploadMetadata.*, will retry.*" ) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 5f72cfd74713..f88c1e890b7e 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -179,6 +179,7 @@ def __repr__(self) -> str: assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - assert env.pageserver.log_contains( - metrics_refused_log_line - ), "ensure the metrics collection worker ran" + ( + env.pageserver.assert_log_contains(metrics_refused_log_line), + "ensure the metrics collection worker ran", + ) diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index a6a6fb47ccd6..795110d90be5 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -89,6 +89,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): assert timeline_path.exists() # retry deletes when compaction or gc is running in pageserver + # TODO: review whether this wait_until is actually necessary, we do an await() internally wait_until( number_of_iterations=3, interval=0.2, @@ -531,7 +532,7 @@ def first_call(result_queue): try: def first_call_hit_failpoint(): - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) @@ -602,7 +603,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" def hit_failpoint(): - assert env.pageserver.log_contains(at_failpoint_log_message) + env.pageserver.assert_log_contains(at_failpoint_log_message) wait_until(50, 0.1, hit_failpoint) @@ -612,7 +613,7 @@ def hit_failpoint(): env.pageserver.allowed_errors.append(hangup_log_message) def got_hangup_log_message(): - assert env.pageserver.log_contains(hangup_log_message) + env.pageserver.assert_log_contains(hangup_log_message) wait_until(50, 0.1, got_hangup_log_message) @@ -624,7 +625,7 @@ def got_hangup_log_message(): def first_request_finished(): message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished" - assert env.pageserver.log_contains(message) + env.pageserver.assert_log_contains(message) wait_until(50, 0.1, first_request_finished) @@ -759,7 +760,7 @@ def test_delete_orphaned_objects( for orphan in orphans: assert not orphan.exists() - assert env.pageserver.log_contains( + env.pageserver.assert_log_contains( f"deleting a file not referenced from index_part.json name={orphan.stem}" ) From 6018e232adf705c73c194788d3d11df1c39e141b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 17:30:54 +0000 Subject: [PATCH 17/21] self-review --- test_runner/fixtures/neon_fixtures.py | 2 ++ test_runner/regress/test_threshold_based_eviction.py | 7 +++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8837e0368f52..b933d391abb8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2351,6 +2351,8 @@ def assert_no_metric_errors(self): def assert_log_contains( self, pattern: str, offset: None | LogCursor = None ) -> Tuple[str, LogCursor]: + """Convenient for use inside wait_until()""" + res = self.log_contains(pattern, offset=offset) assert res is not None return res diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index f88c1e890b7e..7bf49a0874df 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -179,7 +179,6 @@ def __repr__(self) -> str: assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - ( - env.pageserver.assert_log_contains(metrics_refused_log_line), - "ensure the metrics collection worker ran", - ) + assert ( + env.pageserver.log_contains(metrics_refused_log_line) is not None + ), "ensure the metrics collection worker ran" From d86df728e621036135823177926f3b074d9205bd Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 29 Feb 2024 18:02:59 +0000 Subject: [PATCH 18/21] test_remote_storage_upload_queue_retries: need to raise timeout to make tests pass, maybe this was root cause of its flakiness --- test_runner/regress/test_remote_storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index d322b4ffff73..f8a0bef954e0 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -329,9 +329,9 @@ def churn_while_failpoints_active(result): churn_while_failpoints_active_thread.start() # wait for churn thread's data to get stuck in the upload queue - wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) - wait_until(10, 0.1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) - wait_until(10, 0.1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) + wait_until(10, 0.5, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0)) + wait_until(10, 0.5, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2)) + wait_until(10, 0.5, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0)) # unblock churn operations configure_storage_sync_failpoints("off") From d9c3fa71d728920e81d0f13f95dca409e8c03e7f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 1 Mar 2024 09:48:11 +0000 Subject: [PATCH 19/21] adjust log messages --- pageserver/src/metrics.rs | 4 ++-- pageserver/src/tenant/throttle.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 69a10ca08f4f..5661e01a8873 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1033,7 +1033,7 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { let mut guard = LOGGED.lock().unwrap(); let rate_limit = &mut guard[self.op]; rate_limit.call(|| { - warn!(op=?self.op, error, "error deducting time spent throttled, this message is only logged once per process lifetime"); + warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit"); }); elapsed } @@ -1176,7 +1176,7 @@ impl SmgrQueryTimePerTimeline { let mut guard = LOGGED.lock().unwrap(); let rate_limit = &mut guard[op]; rate_limit.call(|| { - warn!(?op, error, "error opening micros_spent_throttled, this message is only logged once per process lifetime"); + warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit"); }); } } diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 0b39bcd20a01..280773e9c305 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -166,7 +166,7 @@ where Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); let mut guard = WARN_RATE_LIMIT.lock().unwrap(); guard.call(move || { - warn!(error, "error adding time spent throttled; this message is rate-limited globally"); + warn!(error, "error adding time spent throttled; this message is logged at a global rate limit"); }); } } From 169065f7a1acb78a172546923de0a478c9110f36 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 5 Mar 2024 11:31:45 +0100 Subject: [PATCH 20/21] DO NOT MERGE: run test 100 times to ensure it's not flaky --- .github/workflows/build_and_test.yml | 2 +- test_runner/regress/test_pageserver_getpage_throttle.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2e52e7c28f27..dd2ca9b03829 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -451,7 +451,7 @@ jobs: fail-fast: false matrix: build_type: [ debug, release ] - pg_version: [ v14, v15, v16 ] + pg_version: [ v15 ] steps: - name: Checkout uses: actions/checkout@v4 diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index 42cc28efee2d..8fc280dfcecf 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -1,6 +1,7 @@ import json import uuid +import pytest from anyio import Path from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, PgBin @@ -9,6 +10,7 @@ from fixtures.utils import wait_until +@pytest.mark.repeat(100) def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env = neon_env_builder.init_start() From 2dcf5d8585a8efdf4f23199bbd432e5a51796700 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 5 Mar 2024 12:19:30 +0000 Subject: [PATCH 21/21] Revert "DO NOT MERGE: run test 100 times to ensure it's not flaky" This reverts commit 169065f7a1acb78a172546923de0a478c9110f36. --- .github/workflows/build_and_test.yml | 2 +- test_runner/regress/test_pageserver_getpage_throttle.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index dd2ca9b03829..2e52e7c28f27 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -451,7 +451,7 @@ jobs: fail-fast: false matrix: build_type: [ debug, release ] - pg_version: [ v15 ] + pg_version: [ v14, v15, v16 ] steps: - name: Checkout uses: actions/checkout@v4 diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index 8fc280dfcecf..42cc28efee2d 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -1,7 +1,6 @@ import json import uuid -import pytest from anyio import Path from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, PgBin @@ -10,7 +9,6 @@ from fixtures.utils import wait_until -@pytest.mark.repeat(100) def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env = neon_env_builder.init_start()