Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple test of pageserver recovery after crash. #1324

Merged
merged 1 commit into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ jobs:
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --bins --tests
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s

- save_cache:
Expand Down
30 changes: 28 additions & 2 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use daemonize::Daemonize;

use fail::FailScenario;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
Expand Down Expand Up @@ -84,8 +85,23 @@ fn main() -> anyhow::Result<()> {
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
)
.arg(
Arg::new("enabled-features")
.long("enabled-features")
.takes_value(false)
.help("Show enabled compile time features"),
)
.get_matches();

if arg_matches.is_present("enabled-features") {
let features: &[&str] = &[
#[cfg(feature = "failpoints")]
"failpoints",
];
println!("{{\"features\": {features:?} }}");
return Ok(());
}

let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
let workdir = workdir
.canonicalize()
Expand Down Expand Up @@ -166,6 +182,14 @@ fn main() -> anyhow::Result<()> {
// as a ref.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));

// If failpoints are used, terminate the whole pageserver process if they are hit.
let scenario = FailScenario::setup();
if fail::has_failpoints() {
std::panic::set_hook(Box::new(|_| {
std::process::exit(1);
}));
}

// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
Expand All @@ -181,10 +205,12 @@ fn main() -> anyhow::Result<()> {
cfg_file_path.display()
)
})?;
Ok(())
} else {
start_pageserver(conf, daemonize).context("Failed to start pageserver")
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
}

scenario.teardown();
Ok(())
}

fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/layered_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1703,6 +1703,7 @@ impl LayeredTimeline {
new_delta_path.clone(),
self.conf.timeline_path(&self.timelineid, &self.tenantid),
])?;
fail_point!("checkpoint-before-sync");

fail_point!("flush-frozen");

Expand All @@ -1727,6 +1728,7 @@ impl LayeredTimeline {
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
let disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1);
fail_point!("checkpoint-after-sync");

// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
Expand Down
64 changes: 64 additions & 0 deletions test_runner/batch_others/test_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import time
import psycopg2.extras
import json
from ast import Assert
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log


#
# Test pageserver recovery after crash
#
def test_pageserver_recovery(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
# Override default checkpointer settings to run it more often
zenith_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance = 1048576}"

env = zenith_env_builder.init()

# Check if failpoints enables. Otherwise the test doesn't make sense
f = env.zenith_cli.pageserver_enabled_features()

assert "failpoints" in f["features"], "Build pageserver with --features=failpoints option to run this test"
zenith_env_builder.start()

# Create a branch for us
env.zenith_cli.create_branch("test_pageserver_recovery", "main")

pg = env.postgres.create_start('test_pageserver_recovery')
log.info("postgres is running on 'test_pageserver_recovery' branch")

connstr = pg.connstr()

with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")

# Sleep for some time to let checkpoint create image layers
time.sleep(2)

# Configure failpoints
pscur.execute(
"failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=panic")

# Do some updates until pageserver is crashed
try:
while True:
cur.execute("update foo set x=x+1")
except Exception as err:
log.info(f"Excepted server crash {err}")

log.info("Wait before server restart")
env.pageserver.stop()
env.pageserver.start()

with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from foo")
assert cur.fetchone() == (100000, )
13 changes: 13 additions & 0 deletions test_runner/fixtures/zenith_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,19 @@ def init(self,
res.check_returncode()
return res

def pageserver_enabled_features(self) -> Any:
bin_pageserver = os.path.join(str(zenith_binpath), 'pageserver')
args = [bin_pageserver, '--enabled-features']
log.info('Running command "{}"'.format(' '.join(args)))

res = subprocess.run(args,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info(f"pageserver_enabled_features success: {res.stdout}")
return json.loads(res.stdout)

def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start', *overrides]
append_pageserver_param_overrides(start_args,
Expand Down