Skip to content

Commit

Permalink
Fixes, per Eric's and Konstantin's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hlinnaka committed Apr 20, 2021
1 parent 20a3960 commit 63d3db7
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 204 deletions.
77 changes: 37 additions & 40 deletions control_plane/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl ComputeControlPlane {
}
}

// Connect to a page server, get base backup, and untar it to initialize a
// new data directory
/// Connect to a page server, get base backup, and untar it to initialize a
/// new data directory
pub fn new_from_page_server(&mut self, is_test: bool, timelineid: ZTimelineId) -> Result<Arc<PostgresNode>> {
let node_id = self.nodes.len() as u32 + 1;

Expand Down Expand Up @@ -215,7 +215,7 @@ impl PostgresNode {

println!(
"Extracting base backup to create postgres instance: path={} port={}",
pgdata.to_str().unwrap(),
pgdata.display(),
self.address.port()
);

Expand All @@ -225,66 +225,64 @@ impl PostgresNode {
}

let sql = format!("basebackup {}", self.timelineid);
let mut client = self.pageserver.page_server_psql_client()?;
println!("connected to page server");
let mut client = self.pageserver.page_server_psql_client().with_context(|| "connecting to page erver failed")?;

fs::create_dir_all(&pgdata)?;
fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)).unwrap();
fs::create_dir_all(&pgdata)
.with_context(|| format!("could not create data directory {}", pgdata.display()))?;
fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700))
.with_context(|| format!("could not set permissions in data directory {}", pgdata.display()))?;

// Also create pg_wal directory, it's not included in the tarball
// FIXME: actually, it is currently.
// FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive.
// But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that
// we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here.
//fs::create_dir_all(pgdata.join("pg_wal"))?;

let mut copyreader = client.copy_out(sql.as_str())?;
let mut copyreader = client.copy_out(sql.as_str())
.with_context(|| "page server 'basebackup' command failed")?;

// FIXME: Currently, we slurp the whole tarball into memory, and then extract it,
// but we really should do this:
//let mut ar = tar::Archive::new(copyreader);
let mut buf = vec![];
copyreader.read_to_end(&mut buf)?;
println!("got tarball of size {}", buf.len());
copyreader.read_to_end(&mut buf)
.with_context(|| "reading base backup from page server failed")?;
let mut ar = tar::Archive::new(buf.as_slice());
ar.unpack(&pgdata)?;
ar.unpack(&pgdata)
.with_context(|| "extracting page backup failed")?;

// listen for selected port
self.append_conf(
"postgresql.conf",
format!(
&format!(
"max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
address = self.address.ip(),
port = self.address.port()
)
.as_str(),
);
));

// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or
// page server yet. But this will do for now.
self.append_conf("postgresql.conf",
format!("wal_keep_size='10TB'\n")
.as_str(),
);
&format!("wal_keep_size='10TB'\n"));

// Connect it to the page server.

// Configure that node to take pages from pageserver
self.append_conf("postgresql.conf",
format!("page_server_connstring = 'host={} port={}'\n\
zenith_timeline='{}'\n",
self.pageserver.address().ip(),
self.pageserver.address().port(),
self.timelineid,
)
.as_str(),
);
&format!("page_server_connstring = 'host={} port={}'\n\
zenith_timeline='{}'\n",
self.pageserver.address().ip(),
self.pageserver.address().port(),
self.timelineid
));

Ok(())
}
Expand Down Expand Up @@ -317,6 +315,7 @@ impl PostgresNode {

fn pg_ctl(&self, args: &[&str]) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");

let pg_ctl = Command::new(pg_ctl_path)
.args(
[
Expand All @@ -332,13 +331,11 @@ impl PostgresNode {
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()?;

.status().with_context(|| "pg_ctl failed")?;
if !pg_ctl.success() {
anyhow::bail!("pg_ctl failed");
} else {
Ok(())
}
Ok(())
}

pub fn start(&self) -> Result<()> {
Expand Down Expand Up @@ -404,7 +401,7 @@ impl PostgresNode {
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_str()])
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
Expand Down
99 changes: 48 additions & 51 deletions control_plane/src/local_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use bytes::Bytes;
use rand::Rng;
use anyhow::Context;

use hex;
use serde_derive::{Deserialize, Serialize};
use anyhow::Result;

Expand Down Expand Up @@ -53,10 +52,10 @@ impl LocalEnv {
}
}

fn zenith_repo_dir() -> String {
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
Expand All @@ -66,7 +65,7 @@ fn zenith_repo_dir() -> String {
//
pub fn init() -> Result<()> {
// check if config already exists
let repo_path = PathBuf::from(zenith_repo_dir());
let repo_path = zenith_repo_dir();
if repo_path.exists() {
anyhow::bail!("{} already exists. Perhaps already initialized?",
repo_path.to_str().unwrap());
Expand Down Expand Up @@ -113,19 +112,19 @@ pub fn init() -> Result<()> {

pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
{
let repopath = String::from(local_env.repo_path.to_str().unwrap());
fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath))?;
fs::create_dir(repopath.clone() + "/pgdatadirs")?;
fs::create_dir(repopath.clone() + "/timelines")?;
fs::create_dir(repopath.clone() + "/refs")?;
fs::create_dir(repopath.clone() + "/refs/branches")?;
fs::create_dir(repopath.clone() + "/refs/tags")?;
println!("created directory structure in {}", repopath);
let repopath = &local_env.repo_path;
fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath.display()))?;
fs::create_dir(repopath.join("pgdatadirs"))?;
fs::create_dir(repopath.join("timelines"))?;
fs::create_dir(repopath.join("refs"))?;
fs::create_dir(repopath.join("refs").join("branches"))?;
fs::create_dir(repopath.join("refs").join("tags"))?;
println!("created directory structure in {}", repopath.display());

// Create initial timeline
let tli = create_timeline(&local_env, None)?;
let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli));
println!("created initial timeline {}", timelinedir);
let timelinedir = repopath.join("timelines").join(tli.to_string());
println!("created initial timeline {}", timelinedir.display());

// Run initdb
//
Expand All @@ -151,7 +150,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
let lsnstr = format!("{:016X}", lsn);

// Move the initial WAL file
fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.clone() + "/wal/000000010000000000000001.partial")?;
fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.join("wal").join("000000010000000000000001.partial"))?;
println!("moved initial WAL file");

// Remove pg_wal
Expand All @@ -161,23 +160,23 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
force_crash_recovery(&PathBuf::from("tmp"))?;
println!("updated pg_control");

let target = timelinedir.clone() + "/snapshots/" + &lsnstr;
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename("tmp", &target)?;
println!("moved 'tmp' to {}", &target);
println!("moved 'tmp' to {}", target.display());

// Create 'main' branch to refer to the initial timeline
let data = hex::encode(tli);
fs::write(repopath.clone() + "/refs/branches/main", data)?;
let data = tli.to_string();
fs::write(repopath.join("refs").join("branches").join("main"), data)?;
println!("created main branch");

// Also update the system id in the LocalEnv
local_env.systemid = systemid;

// write config
let toml = toml::to_string(&local_env)?;
fs::write(repopath.clone() + "/config", toml)?;
fs::write(repopath.join("config"), toml)?;

println!("new zenith repository was created in {}", &repopath);
println!("new zenith repository was created in {}", repopath.display());

Ok(())
}
Expand All @@ -195,9 +194,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
fn force_crash_recovery(datadir: &Path) -> Result<()> {

// Read in the control file
let mut controlfilepath = datadir.to_path_buf();
controlfilepath.push("global");
controlfilepath.push("pg_control");
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile = postgres_ffi::decode_pg_control(
Bytes::from(fs::read(controlfilepath.as_path())?))?;

Expand Down Expand Up @@ -258,28 +255,29 @@ pub struct PointInTime {
pub lsn: u64
}

fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<[u8; 16]> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
let repopath = &local_env.repo_path;

// Create initial timeline
let mut tli = [0u8; 16];
rand::thread_rng().fill(&mut tli);
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);

let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli));
let timelinedir = repopath.join("timelines").join(timelineid.to_string());

fs::create_dir(timelinedir.clone())?;
fs::create_dir(timelinedir.clone() + "/snapshots")?;
fs::create_dir(timelinedir.clone() + "/wal")?;
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;

if let Some(ancestor) = ancestor {
let data = format!("{}@{:X}/{:X}",
hex::encode(ancestor.timelineid.to_str()),
ancestor.timelineid,
ancestor.lsn >> 32,
ancestor.lsn & 0xffffffff);
fs::write(timelinedir + "/ancestor", data)?;
fs::write(timelinedir.join("ancestor"), data)?;
}

Ok(tli)
Ok(timelineid)
}

// Parse an LSN in the format used in filenames
Expand All @@ -292,35 +290,35 @@ fn parse_lsn(s: &str) -> std::result::Result<u64, std::num::ParseIntError> {

// Create a new branch in the repository (for the "zenith branch" subcommand)
pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointInTime) -> Result<()> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let repopath = &local_env.repo_path;

// create a new timeline for it
let newtli = create_timeline(local_env, Some(startpoint))?;
let newtimelinedir = format!("{}/timelines/{}", repopath, &hex::encode(newtli));
let newtimelinedir = repopath.join("timelines").join(newtli.to_string());

let data = hex::encode(newtli);
fs::write(format!("{}/refs/branches/{}", repopath, branchname), data)?;
let data = newtli.to_string();
fs::write(repopath.join("refs").join("branches").join(branchname), data)?;

// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.clone() + "/snapshots", &copy_opts)?;
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;

let oldtimelinedir = format!("{}/timelines/{}", &repopath, startpoint.timelineid.to_str());
let oldtimelinedir = repopath.join("timelines").join(startpoint.timelineid.to_string());
let mut copy_opts = fs_extra::dir::CopyOptions::new();
copy_opts.content_only = true;
fs_extra::dir::copy(oldtimelinedir + "/wal/",
newtimelinedir.clone() + "/wal",
fs_extra::dir::copy(oldtimelinedir.join("wal"),
newtimelinedir.join("wal"),
&copy_opts)?;

Ok(())
}

// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u64> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let waldir = PathBuf::from(format!("{}/timelines/{}/wal", repopath, timeline.to_str()));
let repopath = &local_env.repo_path;
let waldir = repopath.join("timelines").join(timeline.to_string()).join("wal");

let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);

Expand All @@ -329,15 +327,14 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u6

// Find the latest snapshot for a timeline
fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(u64, PathBuf)> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let repopath = &local_env.repo_path;

let timelinedir = repopath + "/timelines/" + &timeline.to_str();
let snapshotsdir = timelinedir.clone() + "/snapshots";
let paths = fs::read_dir(&snapshotsdir).unwrap();
let snapshotsdir = repopath.join("timelines").join(timeline.to_string()).join("snapshots");
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot: u64 = 0;
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path.unwrap();
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = parse_lsn(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
Expand All @@ -346,7 +343,7 @@ fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(
}
if maxsnapshot == 0 {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir);
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}

Ok((maxsnapshot, snapshotdir.unwrap()))
Expand Down
1 change: 1 addition & 0 deletions control_plane/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
Expand Down
Loading

0 comments on commit 63d3db7

Please sign in to comment.