Skip to content

Commit

Permalink
test: Add helper function for importing a Postgres cluster (#8025)
Browse files Browse the repository at this point in the history
Also, modify the "neon_local timeline import" command so that it
doesn't create the endpoint anymore. I don't see any reason to bundle
that in the same command, the "timeline create" and "timeline branch"
commands don't do that either.

I plan to add more tests similar to 'test_import_at_2bil', this will
help to reduce the copy-pasting.
  • Loading branch information
hlinnaka committed Jun 18, 2024
1 parent 6c6a7f9 commit 377db4a
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 98 deletions.
29 changes: 5 additions & 24 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,13 +597,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
Some(("import", import_match)) => {
let tenant_id = get_tenant_id(import_match, env)?;
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
let name = import_match
.get_one::<String>("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
let update_catalog = import_match
.get_one::<bool>("update-catalog")
.cloned()
.unwrap_or_default();
let branch_name = import_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;

// Parse base inputs
let base_tarfile = import_match
Expand All @@ -630,24 +626,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.copied()
.context("Failed to parse postgres version from the argument string")?;

let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver
.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
.await?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;

println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(
name,
tenant_id,
timeline_id,
None,
None,
pg_version,
ComputeMode::Primary,
!update_catalog,
)?;
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
println!("Done");
}
Some(("branch", branch_match)) => {
Expand Down Expand Up @@ -1456,8 +1439,7 @@ fn cli() -> Command {
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(Arg::new("node-name").long("node-name")
.help("Name to assign to the imported timeline"))
.arg(branch_name_arg.clone())
.arg(Arg::new("base-tarfile")
.long("base-tarfile")
.value_parser(value_parser!(PathBuf))
Expand All @@ -1473,7 +1455,6 @@ fn cli() -> Command {
.arg(Arg::new("end-lsn").long("end-lsn")
.help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
.arg(update_catalog.clone())
)
).subcommand(
Command::new("tenant")
Expand Down
64 changes: 64 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4550,6 +4550,70 @@ def fork_at_current_lsn(
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)


def import_timeline_from_vanilla_postgres(
test_output_dir: Path,
env: NeonEnv,
pg_bin: PgBin,
tenant_id: TenantId,
timeline_id: TimelineId,
branch_name: str,
vanilla_pg_connstr: str,
):
"""
Create a new timeline, by importing an existing PostgreSQL cluster.
This works by taking a physical backup of the running PostgreSQL cluster, and importing that.
"""

# Take backup of the existing PostgreSQL server with pg_basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
pg_bin.run(
[
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg_connstr,
"-D",
basebackup_dir,
]
)

# Extract start_lsn and end_lsn form the backup manifest file
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

# Import the backup tarballs into the pageserver
env.neon_cli.raw_cli(
[
"timeline",
"import",
"--tenant-id",
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--branch-name",
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
base_tar,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal_tar,
"--pg-version",
env.pg_version,
]
)
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, Lsn(end_lsn))


def last_flush_lsn_upload(
env: NeonEnv,
endpoint: Endpoint,
Expand Down
16 changes: 8 additions & 8 deletions test_runner/regress/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

endpoint_id = "ep-import_from_vanilla"
branch_name = "import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()

Expand Down Expand Up @@ -106,8 +106,8 @@ def import_tar(base, wal):
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--branch-name",
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
Expand Down Expand Up @@ -146,7 +146,7 @@ def import_tar(base, wal):
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))

# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]

vanilla_pg.stop()
Expand Down Expand Up @@ -265,7 +265,7 @@ def _import(
tenant = TenantId.generate()

# Import to pageserver
endpoint_id = "ep-import_from_pageserver"
branch_name = "import_from_pageserver"
client = env.pageserver.http_client()
env.pageserver.tenant_create(tenant)
env.neon_cli.raw_cli(
Expand All @@ -276,8 +276,8 @@ def _import(
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--branch-name",
branch_name,
"--base-lsn",
str(lsn),
"--base-tarfile",
Expand All @@ -292,7 +292,7 @@ def _import(
wait_for_upload(client, tenant, timeline, lsn)

# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant, lsn=lsn)
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn)
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]

# Take another fullbackup
Expand Down
91 changes: 25 additions & 66 deletions test_runner/regress/test_next_xid.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import json
import os
import time
from pathlib import Path

from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_wal_insert_lsn
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
import_timeline_from_vanilla_postgres,
wait_for_wal_insert_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import query_scalar
Expand Down Expand Up @@ -76,7 +77,6 @@ def test_import_at_2bil(
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()

# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
Expand All @@ -92,76 +92,35 @@ def test_import_at_2bil(
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")

endpoint_id = "ep-import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()

env.pageserver.tenant_create(tenant)

# Take basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
vanilla_pg.safe_psql("CHECKPOINT")
pg_bin.run(
[
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg.connstr(),
"-D",
basebackup_dir,
]
)

# Get start_lsn and end_lsn
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]

def import_tar(base, wal):
env.neon_cli.raw_cli(
[
"timeline",
"import",
"--tenant-id",
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
"--base-lsn",
start_lsn,
"--base-tarfile",
base,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
env.pg_version,
]
)

# Importing correct backup works
import_tar(base_tar, wal_tar)
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
tenant_id = TenantId.generate()
env.pageserver.tenant_create(tenant_id)
timeline_id = TimelineId.generate()

# Import the cluster to Neon
import_timeline_from_vanilla_postgres(
test_output_dir,
env,
pg_bin,
tenant_id,
timeline_id,
"imported_2bil_xids",
vanilla_pg.connstr(),
)
vanilla_pg.stop() # don't need the original server anymore

# Check that it works
endpoint = env.endpoints.create_start(
endpoint_id,
tenant_id=tenant,
"imported_2bil_xids",
tenant_id=tenant_id,
config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime='5 s'",
],
)
assert endpoint.safe_psql("select count(*) from t") == [(1,)]

# Ok, consume
conn = endpoint.connect()
cur = conn.cursor()

Expand Down Expand Up @@ -213,7 +172,7 @@ def import_tar(base, wal):
cur.execute("checkpoint")

# wait until pageserver receives that data
wait_for_wal_insert_lsn(env, endpoint, tenant, timeline)
wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)

# Restart endpoint
endpoint.stop()
Expand Down

0 comments on commit 377db4a

Please sign in to comment.