Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add helper function for importing a Postgres cluster #8025

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
Some(("import", import_match)) => {
let tenant_id = get_tenant_id(import_match, env)?;
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
let name = import_match
.get_one::<String>("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
let update_catalog = import_match
.get_one::<bool>("update-catalog")
.cloned()
.unwrap_or_default();
let branch_name = import_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;

// Parse base inputs
let base_tarfile = import_match
Expand All @@ -633,24 +629,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.copied()
.context("Failed to parse postgres version from the argument string")?;

let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver
.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
.await?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;

println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(
name,
tenant_id,
timeline_id,
None,
None,
pg_version,
ComputeMode::Primary,
!update_catalog,
)?;
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
println!("Done");
}
Some(("branch", branch_match)) => {
Expand Down Expand Up @@ -1487,8 +1470,7 @@ fn cli() -> Command {
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(Arg::new("node-name").long("node-name")
.help("Name to assign to the imported timeline"))
.arg(branch_name_arg.clone())
.arg(Arg::new("base-tarfile")
.long("base-tarfile")
.value_parser(value_parser!(PathBuf))
Expand All @@ -1504,7 +1486,6 @@ fn cli() -> Command {
.arg(Arg::new("end-lsn").long("end-lsn")
.help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
.arg(update_catalog.clone())
)
).subcommand(
Command::new("tenant")
Expand Down
64 changes: 64 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4659,6 +4659,70 @@ def fork_at_current_lsn(
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)


def import_timeline_from_vanilla_postgres(
test_output_dir: Path,
env: NeonEnv,
pg_bin: PgBin,
tenant_id: TenantId,
timeline_id: TimelineId,
branch_name: str,
vanilla_pg_connstr: str,
):
"""
Create a new timeline, by importing an existing PostgreSQL cluster.

This works by taking a physical backup of the running PostgreSQL cluster, and importing that.
"""

# Take backup of the existing PostgreSQL server with pg_basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
pg_bin.run(
[
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg_connstr,
"-D",
basebackup_dir,
]
)

# Extract start_lsn and end_lsn form the backup manifest file
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

# Import the backup tarballs into the pageserver
env.neon_cli.raw_cli(
[
"timeline",
"import",
"--tenant-id",
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--branch-name",
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
base_tar,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal_tar,
"--pg-version",
env.pg_version,
]
)
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, Lsn(end_lsn))


def last_flush_lsn_upload(
env: NeonEnv,
endpoint: Endpoint,
Expand Down
16 changes: 8 additions & 8 deletions test_runner/regress/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

endpoint_id = "ep-import_from_vanilla"
branch_name = "import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()

Expand Down Expand Up @@ -106,8 +106,8 @@ def import_tar(base, wal):
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--branch-name",
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
Expand Down Expand Up @@ -146,7 +146,7 @@ def import_tar(base, wal):
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))

# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]

vanilla_pg.stop()
Expand Down Expand Up @@ -265,7 +265,7 @@ def _import(
tenant = TenantId.generate()
hlinnaka marked this conversation as resolved.
Show resolved Hide resolved

# Import to pageserver
endpoint_id = "ep-import_from_pageserver"
branch_name = "import_from_pageserver"
client = env.pageserver.http_client()
env.pageserver.tenant_create(tenant)
env.neon_cli.raw_cli(
Expand All @@ -276,8 +276,8 @@ def _import(
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--branch-name",
branch_name,
"--base-lsn",
str(lsn),
"--base-tarfile",
Expand All @@ -292,7 +292,7 @@ def _import(
wait_for_upload(client, tenant, timeline, lsn)

# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant, lsn=lsn)
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn)
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]

# Take another fullbackup
Expand Down
91 changes: 25 additions & 66 deletions test_runner/regress/test_next_xid.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import json
import os
import time
from pathlib import Path

from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_wal_insert_lsn
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
import_timeline_from_vanilla_postgres,
wait_for_wal_insert_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import query_scalar
Expand Down Expand Up @@ -76,7 +77,6 @@ def test_import_at_2bil(
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()

# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
Expand All @@ -92,76 +92,35 @@ def test_import_at_2bil(
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")

endpoint_id = "ep-import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()

env.pageserver.tenant_create(tenant)

# Take basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
vanilla_pg.safe_psql("CHECKPOINT")
pg_bin.run(
[
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg.connstr(),
"-D",
basebackup_dir,
]
)

# Get start_lsn and end_lsn
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

def import_tar(base, wal):
env.neon_cli.raw_cli(
[
"timeline",
"import",
"--tenant-id",
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--base-lsn",
start_lsn,
"--base-tarfile",
base,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
env.pg_version,
]
)

# Importing correct backup works
import_tar(base_tar, wal_tar)
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
tenant_id = TenantId.generate()
env.pageserver.tenant_create(tenant_id)
timeline_id = TimelineId.generate()

# Import the cluster to Neon
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
tenant_id,
timeline_id,
"imported_2bil_xids",
vanilla_pg.connstr(),
)
vanilla_pg.stop() # don't need the original server anymore

# Check that it works
endpoint = env.endpoints.create_start(
endpoint_id,
tenant_id=tenant,
"imported_2bil_xids",
tenant_id=tenant_id,
config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime='5 s'",
],
)
assert endpoint.safe_psql("select count(*) from t") == [(1,)]

# Ok, consume
conn = endpoint.connect()
cur = conn.cursor()

Expand Down Expand Up @@ -213,7 +172,7 @@ def import_tar(base, wal):
cur.execute("checkpoint")

# wait until pageserver receives that data
wait_for_wal_insert_lsn(env, endpoint, tenant, timeline)
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)

# Restart endpoint
endpoint.stop()
Expand Down
Loading