diff --git a/.circleci/config.yml b/.circleci/config.yml index 2ed079f03148..864246ad2e70 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -121,7 +121,7 @@ jobs: export RUSTC_WRAPPER=cachepot export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}" export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}" - "${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --bins --tests + "${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests cachepot -s - save_cache: diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2139bea37ef7..6a5d4533d0f7 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,6 +8,7 @@ use anyhow::{bail, Context, Result}; use clap::{App, Arg}; use daemonize::Daemonize; +use fail::FailScenario; use pageserver::{ config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, tenant_mgr, thread_mgr, @@ -84,8 +85,23 @@ fn main() -> anyhow::Result<()> { .help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"), ) + .arg( + Arg::new("enabled-features") + .long("enabled-features") + .takes_value(false) + .help("Show enabled compile time features"), + ) .get_matches(); + if arg_matches.is_present("enabled-features") { + let features: &[&str] = &[ + #[cfg(feature = "failpoints")] + "failpoints", + ]; + println!("{{\"features\": {features:?} }}"); + return Ok(()); + } + let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); let workdir = workdir .canonicalize() @@ -166,6 +182,14 @@ fn main() -> anyhow::Result<()> { // as a ref. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + // If failpoints are used, terminate the whole pageserver process if they are hit. + let scenario = FailScenario::setup(); + if fail::has_failpoints() { + std::panic::set_hook(Box::new(|_| { + std::process::exit(1); + })); + } + // Basic initialization of things that don't change after startup virtual_file::init(conf.max_file_descriptors); page_cache::init(conf.page_cache_size); @@ -181,10 +205,12 @@ fn main() -> anyhow::Result<()> { cfg_file_path.display() ) })?; - Ok(()) } else { - start_pageserver(conf, daemonize).context("Failed to start pageserver") + start_pageserver(conf, daemonize).context("Failed to start pageserver")?; } + + scenario.teardown(); + Ok(()) } fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> { diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 1205f8d86757..e678c8f4cb80 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1703,6 +1703,7 @@ impl LayeredTimeline { new_delta_path.clone(), self.conf.timeline_path(&self.timelineid, &self.tenantid), ])?; + fail_point!("checkpoint-before-sync"); fail_point!("flush-frozen"); @@ -1727,6 +1728,7 @@ impl LayeredTimeline { // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing // *all* the layers, to avoid fsyncing the file multiple times. let disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1); + fail_point!("checkpoint-after-sync"); // If we were able to advance 'disk_consistent_lsn', save it the metadata file. // After crash, we will restart WAL streaming and processing from that point. diff --git a/test_runner/batch_others/test_recovery.py b/test_runner/batch_others/test_recovery.py new file mode 100644 index 000000000000..dbfa943a7ade --- /dev/null +++ b/test_runner/batch_others/test_recovery.py @@ -0,0 +1,64 @@ +import os +import time +import psycopg2.extras +import json +from ast import Assert +from contextlib import closing +from fixtures.zenith_fixtures import ZenithEnvBuilder +from fixtures.log_helper import log + + +# +# Test pageserver recovery after crash +# +def test_pageserver_recovery(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 1 + # Override default checkpointer settings to run it more often + zenith_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance = 1048576}" + + env = zenith_env_builder.init() + + # Check if failpoints enables. Otherwise the test doesn't make sense + f = env.zenith_cli.pageserver_enabled_features() + + assert "failpoints" in f["features"], "Build pageserver with --features=failpoints option to run this test" + zenith_env_builder.start() + + # Create a branch for us + env.zenith_cli.create_branch("test_pageserver_recovery", "main") + + pg = env.postgres.create_start('test_pageserver_recovery') + log.info("postgres is running on 'test_pageserver_recovery' branch") + + connstr = pg.connstr() + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: + # Create and initialize test table + cur.execute("CREATE TABLE foo(x bigint)") + cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))") + + # Sleep for some time to let checkpoint create image layers + time.sleep(2) + + # Configure failpoints + pscur.execute( + "failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=panic") + + # Do some updates until pageserver is crashed + try: + while True: + cur.execute("update foo set x=x+1") + except Exception as err: + log.info(f"Excepted server crash {err}") + + log.info("Wait before server restart") + env.pageserver.stop() + env.pageserver.start() + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("select count(*) from foo") + assert cur.fetchone() == (100000, ) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 5b25b1c457a7..9319a53778db 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -980,6 +980,19 @@ def init(self, res.check_returncode() return res + def pageserver_enabled_features(self) -> Any: + bin_pageserver = os.path.join(str(zenith_binpath), 'pageserver') + args = [bin_pageserver, '--enabled-features'] + log.info('Running command "{}"'.format(' '.join(args))) + + res = subprocess.run(args, + check=True, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + log.info(f"pageserver_enabled_features success: {res.stdout}") + return json.loads(res.stdout) + def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]': start_args = ['pageserver', 'start', *overrides] append_pageserver_param_overrides(start_args,