diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index c2b29e739722..91ad2ba805dc 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -73,8 +73,8 @@ impl ComputeControlPlane { } } - // Connect to a page server, get base backup, and untar it to initialize a - // new data directory + /// Connect to a page server, get base backup, and untar it to initialize a + /// new data directory pub fn new_from_page_server(&mut self, is_test: bool, timelineid: ZTimelineId) -> Result> { let node_id = self.nodes.len() as u32 + 1; @@ -215,7 +215,7 @@ impl PostgresNode { println!( "Extracting base backup to create postgres instance: path={} port={}", - pgdata.to_str().unwrap(), + pgdata.display(), self.address.port() ); @@ -225,66 +225,64 @@ impl PostgresNode { } let sql = format!("basebackup {}", self.timelineid); - let mut client = self.pageserver.page_server_psql_client()?; - println!("connected to page server"); + let mut client = self.pageserver.page_server_psql_client().with_context(|| "connecting to page erver failed")?; - fs::create_dir_all(&pgdata)?; - fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)).unwrap(); + fs::create_dir_all(&pgdata) + .with_context(|| format!("could not create data directory {}", pgdata.display()))?; + fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)) + .with_context(|| format!("could not set permissions in data directory {}", pgdata.display()))?; - // Also create pg_wal directory, it's not included in the tarball - // FIXME: actually, it is currently. + // FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive. + // But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that + // we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here. //fs::create_dir_all(pgdata.join("pg_wal"))?; - let mut copyreader = client.copy_out(sql.as_str())?; + let mut copyreader = client.copy_out(sql.as_str()) + .with_context(|| "page server 'basebackup' command failed")?; // FIXME: Currently, we slurp the whole tarball into memory, and then extract it, // but we really should do this: //let mut ar = tar::Archive::new(copyreader); let mut buf = vec![]; - copyreader.read_to_end(&mut buf)?; - println!("got tarball of size {}", buf.len()); + copyreader.read_to_end(&mut buf) + .with_context(|| "reading base backup from page server failed")?; let mut ar = tar::Archive::new(buf.as_slice()); - ar.unpack(&pgdata)?; + ar.unpack(&pgdata) + .with_context(|| "extracting page backup failed")?; // listen for selected port self.append_conf( "postgresql.conf", - format!( + &format!( "max_wal_senders = 10\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - max_connections = 100\n\ - wal_level = replica\n\ - listen_addresses = '{address}'\n\ - port = {port}\n", + max_replication_slots = 10\n\ + hot_standby = on\n\ + shared_buffers = 1MB\n\ + max_connections = 100\n\ + wal_level = replica\n\ + listen_addresses = '{address}'\n\ + port = {port}\n", address = self.address.ip(), port = self.address.port() - ) - .as_str(), - ); + )); // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node // from removing WAL that hasn't been streamed to the safekeepr or // page server yet. But this will do for now. self.append_conf("postgresql.conf", - format!("wal_keep_size='10TB'\n") - .as_str(), - ); + &format!("wal_keep_size='10TB'\n")); // Connect it to the page server. // Configure that node to take pages from pageserver self.append_conf("postgresql.conf", - format!("page_server_connstring = 'host={} port={}'\n\ - zenith_timeline='{}'\n", - self.pageserver.address().ip(), - self.pageserver.address().port(), - self.timelineid, - ) - .as_str(), - ); + &format!("page_server_connstring = 'host={} port={}'\n\ + zenith_timeline='{}'\n", + self.pageserver.address().ip(), + self.pageserver.address().port(), + self.timelineid + )); Ok(()) } @@ -317,6 +315,7 @@ impl PostgresNode { fn pg_ctl(&self, args: &[&str]) -> Result<()> { let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); + let pg_ctl = Command::new(pg_ctl_path) .args( [ @@ -332,13 +331,11 @@ impl PostgresNode { ) .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status()?; - + .status().with_context(|| "pg_ctl failed")?; if !pg_ctl.success() { anyhow::bail!("pg_ctl failed"); - } else { - Ok(()) } + Ok(()) } pub fn start(&self) -> Result<()> { @@ -404,7 +401,7 @@ impl PostgresNode { pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode { let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); match Command::new(proxy_path.as_path()) - .args(&["--ztimelineid", &self.timelineid.to_str()]) + .args(&["--ztimelineid", &self.timelineid.to_string()]) .args(&["-s", wal_acceptors]) .args(&["-h", &self.address.ip().to_string()]) .args(&["-p", &self.address.port().to_string()]) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 5ac5cb8fd2df..e2c310f733f3 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -12,7 +12,6 @@ use bytes::Bytes; use rand::Rng; use anyhow::Context; -use hex; use serde_derive::{Deserialize, Serialize}; use anyhow::Result; @@ -53,10 +52,10 @@ impl LocalEnv { } } -fn zenith_repo_dir() -> String { +fn zenith_repo_dir() -> PathBuf { // Find repository path match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => String::from(val.to_str().unwrap()), + Some(val) => PathBuf::from(val.to_str().unwrap()), None => ".zenith".into(), } } @@ -66,7 +65,7 @@ fn zenith_repo_dir() -> String { // pub fn init() -> Result<()> { // check if config already exists - let repo_path = PathBuf::from(zenith_repo_dir()); + let repo_path = zenith_repo_dir(); if repo_path.exists() { anyhow::bail!("{} already exists. Perhaps already initialized?", repo_path.to_str().unwrap()); @@ -113,19 +112,19 @@ pub fn init() -> Result<()> { pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { - let repopath = String::from(local_env.repo_path.to_str().unwrap()); - fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath))?; - fs::create_dir(repopath.clone() + "/pgdatadirs")?; - fs::create_dir(repopath.clone() + "/timelines")?; - fs::create_dir(repopath.clone() + "/refs")?; - fs::create_dir(repopath.clone() + "/refs/branches")?; - fs::create_dir(repopath.clone() + "/refs/tags")?; - println!("created directory structure in {}", repopath); + let repopath = &local_env.repo_path; + fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath.display()))?; + fs::create_dir(repopath.join("pgdatadirs"))?; + fs::create_dir(repopath.join("timelines"))?; + fs::create_dir(repopath.join("refs"))?; + fs::create_dir(repopath.join("refs").join("branches"))?; + fs::create_dir(repopath.join("refs").join("tags"))?; + println!("created directory structure in {}", repopath.display()); // Create initial timeline let tli = create_timeline(&local_env, None)?; - let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli)); - println!("created initial timeline {}", timelinedir); + let timelinedir = repopath.join("timelines").join(tli.to_string()); + println!("created initial timeline {}", timelinedir.display()); // Run initdb // @@ -151,7 +150,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> let lsnstr = format!("{:016X}", lsn); // Move the initial WAL file - fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.clone() + "/wal/000000010000000000000001.partial")?; + fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.join("wal").join("000000010000000000000001.partial"))?; println!("moved initial WAL file"); // Remove pg_wal @@ -161,13 +160,13 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> force_crash_recovery(&PathBuf::from("tmp"))?; println!("updated pg_control"); - let target = timelinedir.clone() + "/snapshots/" + &lsnstr; + let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename("tmp", &target)?; - println!("moved 'tmp' to {}", &target); + println!("moved 'tmp' to {}", target.display()); // Create 'main' branch to refer to the initial timeline - let data = hex::encode(tli); - fs::write(repopath.clone() + "/refs/branches/main", data)?; + let data = tli.to_string(); + fs::write(repopath.join("refs").join("branches").join("main"), data)?; println!("created main branch"); // Also update the system id in the LocalEnv @@ -175,9 +174,9 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> // write config let toml = toml::to_string(&local_env)?; - fs::write(repopath.clone() + "/config", toml)?; + fs::write(repopath.join("config"), toml)?; - println!("new zenith repository was created in {}", &repopath); + println!("new zenith repository was created in {}", repopath.display()); Ok(()) } @@ -195,9 +194,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> fn force_crash_recovery(datadir: &Path) -> Result<()> { // Read in the control file - let mut controlfilepath = datadir.to_path_buf(); - controlfilepath.push("global"); - controlfilepath.push("pg_control"); + let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); let mut controlfile = postgres_ffi::decode_pg_control( Bytes::from(fs::read(controlfilepath.as_path())?))?; @@ -258,28 +255,29 @@ pub struct PointInTime { pub lsn: u64 } -fn create_timeline(local_env: &LocalEnv, ancestor: Option) -> Result<[u8; 16]> { - let repopath = String::from(local_env.repo_path.to_str().unwrap()); +fn create_timeline(local_env: &LocalEnv, ancestor: Option) -> Result { + let repopath = &local_env.repo_path; // Create initial timeline - let mut tli = [0u8; 16]; - rand::thread_rng().fill(&mut tli); + let mut tli_buf = [0u8; 16]; + rand::thread_rng().fill(&mut tli_buf); + let timelineid = ZTimelineId::from(tli_buf); - let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli)); + let timelinedir = repopath.join("timelines").join(timelineid.to_string()); - fs::create_dir(timelinedir.clone())?; - fs::create_dir(timelinedir.clone() + "/snapshots")?; - fs::create_dir(timelinedir.clone() + "/wal")?; + fs::create_dir(&timelinedir)?; + fs::create_dir(&timelinedir.join("snapshots"))?; + fs::create_dir(&timelinedir.join("wal"))?; if let Some(ancestor) = ancestor { let data = format!("{}@{:X}/{:X}", - hex::encode(ancestor.timelineid.to_str()), + ancestor.timelineid, ancestor.lsn >> 32, ancestor.lsn & 0xffffffff); - fs::write(timelinedir + "/ancestor", data)?; + fs::write(timelinedir.join("ancestor"), data)?; } - Ok(tli) + Ok(timelineid) } // Parse an LSN in the format used in filenames @@ -292,26 +290,26 @@ fn parse_lsn(s: &str) -> std::result::Result { // Create a new branch in the repository (for the "zenith branch" subcommand) pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointInTime) -> Result<()> { - let repopath = String::from(local_env.repo_path.to_str().unwrap()); + let repopath = &local_env.repo_path; // create a new timeline for it let newtli = create_timeline(local_env, Some(startpoint))?; - let newtimelinedir = format!("{}/timelines/{}", repopath, &hex::encode(newtli)); + let newtimelinedir = repopath.join("timelines").join(newtli.to_string()); - let data = hex::encode(newtli); - fs::write(format!("{}/refs/branches/{}", repopath, branchname), data)?; + let data = newtli.to_string(); + fs::write(repopath.join("refs").join("branches").join(branchname), data)?; // Copy the latest snapshot (TODO: before the startpoint) and all WAL // TODO: be smarter and avoid the copying... let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?; let copy_opts = fs_extra::dir::CopyOptions::new(); - fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.clone() + "/snapshots", ©_opts)?; + fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - let oldtimelinedir = format!("{}/timelines/{}", &repopath, startpoint.timelineid.to_str()); + let oldtimelinedir = repopath.join("timelines").join(startpoint.timelineid.to_string()); let mut copy_opts = fs_extra::dir::CopyOptions::new(); copy_opts.content_only = true; - fs_extra::dir::copy(oldtimelinedir + "/wal/", - newtimelinedir.clone() + "/wal", + fs_extra::dir::copy(oldtimelinedir.join("wal"), + newtimelinedir.join("wal"), ©_opts)?; Ok(()) @@ -319,8 +317,8 @@ pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointIn // Find the end of valid WAL in a wal directory pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result { - let repopath = String::from(local_env.repo_path.to_str().unwrap()); - let waldir = PathBuf::from(format!("{}/timelines/{}/wal", repopath, timeline.to_str())); + let repopath = &local_env.repo_path; + let waldir = repopath.join("timelines").join(timeline.to_string()).join("wal"); let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true); @@ -329,15 +327,14 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result Result<(u64, PathBuf)> { - let repopath = String::from(local_env.repo_path.to_str().unwrap()); + let repopath = &local_env.repo_path; - let timelinedir = repopath + "/timelines/" + &timeline.to_str(); - let snapshotsdir = timelinedir.clone() + "/snapshots"; - let paths = fs::read_dir(&snapshotsdir).unwrap(); + let snapshotsdir = repopath.join("timelines").join(timeline.to_string()).join("snapshots"); + let paths = fs::read_dir(&snapshotsdir)?; let mut maxsnapshot: u64 = 0; let mut snapshotdir: Option = None; for path in paths { - let path = path.unwrap(); + let path = path?; let filename = path.file_name().to_str().unwrap().to_owned(); if let Ok(lsn) = parse_lsn(&filename) { maxsnapshot = std::cmp::max(lsn, maxsnapshot); @@ -346,7 +343,7 @@ fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<( } if maxsnapshot == 0 { // TODO: check ancestor timeline - anyhow::bail!("no snapshot found in {}", snapshotsdir); + anyhow::bail!("no snapshot found in {}", snapshotsdir.display()); } Ok((maxsnapshot, snapshotdir.unwrap())) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index f2dbf8dc1a6a..5e5e2bff5183 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use std::net::TcpStream; use std::path::{Path, PathBuf}; use std::process::Command; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c9b547896cdd..c738f90f41fb 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,4 +1,6 @@ use std::net::SocketAddr; +use std::str::FromStr; +use std::fmt; pub mod page_cache; pub mod page_service; @@ -23,9 +25,10 @@ pub struct PageServerConf { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct ZTimelineId([u8; 16]); -impl ZTimelineId { +impl FromStr for ZTimelineId { + type Err = hex::FromHexError; - pub fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { let timelineid = hex::decode(s)?; let mut buf: [u8; 16] = [0u8; 16]; @@ -33,6 +36,9 @@ impl ZTimelineId { Ok(ZTimelineId(buf)) } +} + +impl ZTimelineId { pub fn from(b: [u8; 16]) -> ZTimelineId { ZTimelineId(b) } @@ -46,14 +52,11 @@ impl ZTimelineId { pub fn as_arr(&self) -> [u8; 16] { self.0 } - - pub fn to_str(self: &ZTimelineId) -> String { - hex::encode(self.0) - } } -impl std::fmt::Display for ZTimelineId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.to_str()) +impl fmt::Display for ZTimelineId { + + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b534d5db6d1c..27cad6090bae 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -15,6 +15,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::io; use std::thread; +use std::str::FromStr; use std::sync::Arc; use regex::Regex; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; @@ -247,14 +248,13 @@ impl FeDescribeMessage { } */ - if kind != 0x53 { // 'S' + if kind != b'S' { return Err(io::Error::new( io::ErrorKind::InvalidInput, "only prepared statmement Describe is implemented", )); } - Ok(FeMessage::Describe(FeDescribeMessage {kind})) } } @@ -262,7 +262,8 @@ impl FeDescribeMessage { // we only support unnamed prepared stmt or portal #[derive(Debug)] struct FeExecuteMessage { - maxrows: i32// max # of rows + /// max # of rows + maxrows: i32 } impl FeExecuteMessage { @@ -469,7 +470,7 @@ impl Connection { buffer: BytesMut::with_capacity(10 * 1024), init_done: false, conf, - runtime: runtime.clone(), + runtime: Arc::clone(runtime), } } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index c53c04ef9258..e7418ac09d47 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -45,19 +45,20 @@ const GLOBALTABLESPACE_OID: u32 = 1664; // pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId) -> Result<()> { - let timelinepath = PathBuf::from("timelines").join(&timeline.to_str()); + let timelinepath = PathBuf::from("timelines").join(timeline.to_string()); if !timelinepath.exists() { anyhow::bail!("timeline {} does not exist in the page server's repository"); } // Scan .zenith/timelines//snapshots - let snapshotspath = "timelines/".to_owned() + &timeline.to_str() + "/snapshots"; + let snapshotspath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots"); let mut last_snapshot_lsn: u64 = 0; for direntry in fs::read_dir(&snapshotspath).unwrap() { - let filename = direntry.unwrap().file_name().to_str().unwrap().to_owned(); + let direntry = direntry?; + let filename = direntry.file_name().to_str().unwrap().to_owned(); let lsn = u64::from_str_radix(&filename, 16)?; last_snapshot_lsn = max(lsn, last_snapshot_lsn); @@ -67,7 +68,7 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi } if last_snapshot_lsn == 0 { - error!("could not find valid snapshot in {}", &snapshotspath); + error!("could not find valid snapshot in {}", snapshotspath.display()); // TODO return error? } pcache.init_valid_lsn(last_snapshot_lsn); @@ -98,54 +99,42 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, snapshot: &str) -> Result<()> { - let snapshotpath = "timelines/".to_owned() + &timeline.to_str() + "/snapshots/" + snapshot; + let snapshotpath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots").join(snapshot); // Scan 'global' - let paths = fs::read_dir(snapshotpath.clone() + "/global").unwrap(); + for direntry in fs::read_dir(snapshotpath.join("global"))? { + let direntry = direntry?; + match direntry.file_name().to_str() { + None => continue, - for direntry in paths { - let path = direntry.unwrap().path(); - let filename = path.file_name(); - if filename.is_none() { - continue; - } - let filename = filename.unwrap().to_str(); + // These special files appear in the snapshot, but are not needed by the page server + Some("pg_control") => continue, + Some("pg_filenode.map") => continue, - if filename == Some("pg_control") { - continue; + // Load any relation files into the page server + _ => restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &direntry.path())?, } - if filename == Some("pg_filenode.map") { - continue; - } - - restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &path)?; } - // Scan 'base' - let paths = fs::read_dir(snapshotpath.clone() + "/base").unwrap(); - for path in paths { - let path = path.unwrap(); - let filename = path.file_name().to_str().unwrap().to_owned(); - - // Scan database dirs - let dboid = u32::from_str_radix(&filename, 10)?; - - let paths = fs::read_dir(path.path()).unwrap(); - for direntry in paths { - let path = direntry.unwrap().path(); - let filename = path.file_name(); - if filename.is_none() { - continue; - } - let filename = filename.unwrap().to_str(); - if filename == Some("PG_VERSION") { - continue; - } - if filename == Some("pg_filenode.map") { - continue; - } + // Scan 'base'. It contains database dirs, the database OID is the filename. + // E.g. 'base/12345', where 12345 is the database OID. + for direntry in fs::read_dir(snapshotpath.join("base"))? { + let direntry = direntry?; + + let dboid = u32::from_str_radix(direntry.file_name().to_str().unwrap(), 10)?; - restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &path)?; + for direntry in fs::read_dir(direntry.path())? { + let direntry = direntry?; + match direntry.file_name().to_str() { + None => continue, + + // These special files appear in the snapshot, but are not needed by the page server + Some("PG_VERSION") => continue, + Some("pg_filenode.map") => continue, + + // Load any relation files into the page server + _ => restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &direntry.path())?, + } } } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 22ab546d5e19..8172b875dafd 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,10 +1,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; - -use std::cmp::min; -use std::error::Error; -use std::fmt; - use log::*; +use std::cmp::min; +use thiserror::Error; const XLOG_BLCKSZ: u32 = 8192; @@ -54,28 +51,11 @@ pub struct WalStreamDecoder { } -#[derive(Debug, Clone)] +#[derive(Error, Debug, Clone)] +#[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, -} - -impl Error for WalDecodeError { - fn description(&self) -> &str { - &self.msg - } -} -impl WalDecodeError { - fn new(msg: &str) -> WalDecodeError { - WalDecodeError { - msg: msg.to_string(), - } - } -} - -impl fmt::Display for WalDecodeError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "WAL decoding error: {}", self.msg) - } + lsn: u64 } // @@ -100,8 +80,14 @@ impl WalStreamDecoder { self.inputbuf.extend_from_slice(buf); } - // Returns a tuple: - // (end LSN, record) + /// Attempt to decode another WAL record from the input that has been fed to the + /// decoder so far. + /// + /// Returns one of the following: + /// Ok((u64, Bytes)): a tuple containing the LSN of next record, and the record itself + /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function + /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. + /// pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go @@ -114,9 +100,7 @@ impl WalStreamDecoder { let hdr = self.decode_XLogLongPageHeaderData(); if hdr.std.xlp_pageaddr != self.lsn { - return Err(WalDecodeError::new(&format!("invalid xlog segment header at {:X}/{:X}", - self.lsn >> 32, - self.lsn & 0xffffffff))); + return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn }); } // TODO: verify the remaining fields in the header @@ -131,9 +115,7 @@ impl WalStreamDecoder { let hdr = self.decode_XLogPageHeaderData(); if hdr.xlp_pageaddr != self.lsn { - return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}: {:?}", - self.lsn >> 32, - self.lsn & 0xffffffff, hdr))); + return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn }); } // TODO: verify the remaining fields in the header @@ -159,10 +141,7 @@ impl WalStreamDecoder { self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); if xl_tot_len < SizeOfXLogRecord { - return Err(WalDecodeError::new(&format!("invalid xl_tot_len {} at {:X}/{:X}", - xl_tot_len, - self.lsn >> 32, - self.lsn & 0xffffffff))); + return Err(WalDecodeError {msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn }); } self.lsn += 4; diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 38a32bb730b1..fcc475826da0 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -65,7 +65,7 @@ fn main() -> Result<()> { .get_matches(); let systemid_str = arg_matches.value_of("systemid").unwrap(); - let systemid = u64::from_str_radix(systemid_str, 10)?; + let systemid: u64 = systemid_str.parse()?; let mut conf = WalAcceptorConf { data_dir: PathBuf::from("./"), diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 8179a734b9d4..f6e18d9aa4df 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -3,6 +3,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use pageserver::ZTimelineId; use std::io; use std::str; +use std::str::FromStr; pub type Oid = u32; pub type SystemId = u64; diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 3dc873e27b73..54e28cda5bcb 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -601,9 +601,8 @@ impl Connection { fn set_timeline(&mut self, timelineid: ZTimelineId) -> Result<()> { let mut timelines = TIMELINES.lock().unwrap(); if !timelines.contains_key(&timelineid) { - let timeline_dir = timelineid.to_str(); - info!("creating timeline dir {}", &timeline_dir); - fs::create_dir_all(&timeline_dir)?; + info!("creating timeline dir {}", timelineid); + fs::create_dir_all(timelineid.to_string())?; timelines.insert(timelineid, Arc::new(Timeline::new(timelineid))); } self.timeline = Some(timelines.get(&timelineid).unwrap().clone()); @@ -1112,12 +1111,12 @@ impl Connection { let wal_file_path = self .conf .data_dir - .join(self.timeline().timelineid.to_str()) + .join(self.timeline().timelineid.to_string()) .join(wal_file_name.clone()); let wal_file_partial_path = self .conf .data_dir - .join(self.timeline().timelineid.to_str()) + .join(self.timeline().timelineid.to_string()) .join(wal_file_name.clone() + ".partial"); { diff --git a/zenith/src/main.rs b/zenith/src/main.rs index de29f386a04e..1d0b5b73d42f 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,10 +1,11 @@ use std::fs; use std::path::{Path, PathBuf}; use std::process::exit; +use std::str::FromStr; use clap::{App, Arg, ArgMatches, SubCommand}; use anyhow::Result; -use anyhow::*; +use anyhow::{anyhow, bail}; use control_plane::{compute::ComputeControlPlane, local_env, storage}; use control_plane::local_env::LocalEnv; @@ -12,10 +13,10 @@ use control_plane::storage::PageServerNode; use pageserver::ZTimelineId; -fn zenith_repo_dir() -> String { +fn zenith_repo_dir() -> PathBuf { // Find repository path match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => String::from(val.to_str().unwrap()), + Some(val) => PathBuf::from(val.to_str().unwrap()), None => ".zenith".into(), } } @@ -239,19 +240,20 @@ fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> { } } else { // No arguments, list branches - list_branches(); + list_branches()?; } Ok(()) } -fn list_branches() { +fn list_branches() -> Result<()> { // list branches - let paths = fs::read_dir(zenith_repo_dir() + "/refs/branches").unwrap(); + let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?; for path in paths { - let filename = path.unwrap().file_name().to_str().unwrap().to_owned(); - println!(" {}", filename); + println!(" {}", path?.file_name().to_str().unwrap()); } + + Ok(()) } // @@ -281,8 +283,8 @@ fn parse_point_in_time(s: &str) -> Result { let lsn: Option; if let Some(lsnstr) = strings.next() { let mut s = lsnstr.split("/"); - let lsn_hi: u64 = s.next().unwrap().parse()?; - let lsn_lo: u64 = s.next().unwrap().parse()?; + let lsn_hi: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?; + let lsn_lo: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?; lsn = Some(lsn_hi << 32 | lsn_lo); } else { @@ -291,7 +293,7 @@ fn parse_point_in_time(s: &str) -> Result { // Check if it's a tag if lsn.is_none() { - let tagpath:PathBuf = PathBuf::from(zenith_repo_dir() + "/refs/tags/" + name); + let tagpath = zenith_repo_dir().join("refs").join("tags").join(name); if tagpath.exists() { let pointstr = fs::read_to_string(tagpath)?; @@ -300,7 +302,7 @@ fn parse_point_in_time(s: &str) -> Result { } // Check if it's a branch // Check if it's branch @ LSN - let branchpath:PathBuf = PathBuf::from(zenith_repo_dir() + "/refs/branches/" + name); + let branchpath = zenith_repo_dir().join("refs").join("branches").join(name); if branchpath.exists() { let pointstr = fs::read_to_string(branchpath)?; @@ -315,7 +317,7 @@ fn parse_point_in_time(s: &str) -> Result { // Check if it's a timelineid // Check if it's timelineid @ LSN - let tlipath:PathBuf = PathBuf::from(zenith_repo_dir() + "/timelines/" + name); + let tlipath = zenith_repo_dir().join("timelines").join(name); if tlipath.exists() { let result = local_env::PointInTime { timelineid: ZTimelineId::from_str(name)?,