Skip to content

Commit

Permalink
Add FullAccessTimeline guard in safekeepers (#7887)
Browse files Browse the repository at this point in the history
This is a preparation for
#6337.

The idea is to add FullAccessTimeline, which will act as a guard for
tasks requiring access to WAL files. Eviction will be blocked on these
tasks and WAL won't be deleted from disk until there is at least one
active FullAccessTimeline.

To get FullAccessTimeline, tasks call `tli.full_access_guard().await?`.
After eviction is implemented, this function will be responsible for
downloading missing WAL file and waiting until the download finishes.

This commit also contains other small refactorings:
- Separate `get_tenant_dir` and `get_timeline_dir` functions for
building a local path. This is useful for looking at usages and finding
tasks requiring access to local filesystem.
- `timeline_manager` is now responsible for spawning all background
tasks
- WAL removal task is now spawned instantly after horizon is updated
  • Loading branch information
petuhovskiy authored and a-masterov committed Jun 3, 2024
1 parent 6ee8257 commit e647bb5
Show file tree
Hide file tree
Showing 23 changed files with 724 additions and 574 deletions.
4 changes: 2 additions & 2 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl RemotePath {
self.0.file_name()
}

pub fn join(&self, segment: &Utf8Path) -> Self {
Self(self.0.join(segment))
pub fn join(&self, path: impl AsRef<Utf8Path>) -> Self {
Self(self.0.join(path))
}

pub fn get_path(&self) -> &Utf8PathBuf {
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
);
}
}
Expand Down
11 changes: 1 addition & 10 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::remove_wal;
use safekeeper::http;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
Expand Down Expand Up @@ -441,14 +440,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));

let conf_ = conf.clone();
let wal_remover_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
.spawn(remove_wal::task_main(conf_))
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));

set_build_info_metric(GIT_VERSION, BUILD_TAG);

// TODO: update tokio-stream, convert to real async Stream with
Expand Down
37 changes: 17 additions & 20 deletions safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::crashsafe::durable_rename;
Expand All @@ -12,9 +12,9 @@ use std::ops::Deref;
use std::path::Path;
use std::time::Instant;

use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::TimelinePersistentState;
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
use utils::{bin_ser::LeSer, id::TenantTimelineId};

use crate::SafeKeeperConf;
Expand Down Expand Up @@ -43,7 +43,7 @@ pub trait Storage: Deref<Target = TimelinePersistentState> {
pub struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: Utf8PathBuf,
conf: SafeKeeperConf,
no_sync: bool,

/// Last state persisted to disk.
state: TimelinePersistentState,
Expand All @@ -54,13 +54,12 @@ pub struct FileStorage {
impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = conf.timeline_dir(ttid);

let state = Self::load_control_file_conf(conf, ttid)?;
let timeline_dir = get_timeline_dir(conf, ttid);
let state = Self::load_control_file_from_dir(&timeline_dir)?;

Ok(FileStorage {
timeline_dir,
conf: conf.clone(),
no_sync: conf.no_sync,
state,
last_persist_at: Instant::now(),
})
Expand All @@ -74,7 +73,7 @@ impl FileStorage {
) -> Result<FileStorage> {
let store = FileStorage {
timeline_dir,
conf: conf.clone(),
no_sync: conf.no_sync,
state,
last_persist_at: Instant::now(),
};
Expand Down Expand Up @@ -102,12 +101,9 @@ impl FileStorage {
upgrade_control_file(buf, version)
}

/// Load control file for given ttid at path specified by conf.
pub fn load_control_file_conf(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<TimelinePersistentState> {
let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
/// Load control file from given directory.
pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
let path = timeline_dir.join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}

Expand Down Expand Up @@ -203,7 +199,7 @@ impl Storage for FileStorage {
})?;

let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?;
durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;

// update internal state
self.state = s.clone();
Expand Down Expand Up @@ -233,24 +229,25 @@ mod test {
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
Ok((
FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_conf(conf, ttid)?,
FileStorage::load_control_file_from_dir(&timeline_dir)?,
))
}

async fn create(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let timeline_dir = conf.timeline_dir(ttid);
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
Ok((storage, state))
}
Expand Down Expand Up @@ -291,7 +288,7 @@ mod test {
.await
.expect("failed to persist state");
}
let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).await.unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data)
Expand Down
26 changes: 9 additions & 17 deletions safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
state::TimelinePersistentState,
timeline::{Timeline, TimelineError},
timeline::{FullAccessTimeline, Timeline, TimelineError},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines, SafeKeeperConf,
GlobalTimelines,
};

// we don't want to have more than 10 segments on disk after copy, because they take space
Expand Down Expand Up @@ -46,12 +46,14 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
}

let source_tli = request.source.full_access_guard().await?;

let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;

let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;

let (mem_state, state) = request.source.get_state().await;
let (mem_state, state) = source_tli.get_state().await;
let start_lsn = state.timeline_start_lsn;
if start_lsn == Lsn::INVALID {
bail!("timeline is not initialized");
Expand All @@ -60,7 +62,7 @@ pub async fn handle_request(request: Request) -> Result<()> {

{
let commit_lsn = mem_state.commit_lsn;
let flush_lsn = request.source.get_flush_lsn().await;
let flush_lsn = source_tli.get_flush_lsn().await;

info!(
"collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
Expand Down Expand Up @@ -127,10 +129,8 @@ pub async fn handle_request(request: Request) -> Result<()> {
.await?;

copy_disk_segments(
conf,
&state,
&source_tli,
wal_seg_size,
&request.source.ttid,
new_backup_lsn,
request.until_lsn,
&tli_dir_path,
Expand Down Expand Up @@ -159,21 +159,13 @@ pub async fn handle_request(request: Request) -> Result<()> {
}

async fn copy_disk_segments(
conf: &SafeKeeperConf,
persisted_state: &TimelinePersistentState,
tli: &FullAccessTimeline,
wal_seg_size: usize,
source_ttid: &TenantTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
tli_dir_path: &Utf8PathBuf,
) -> Result<()> {
let mut wal_reader = WalReader::new(
conf.workdir.clone(),
conf.timeline_dir(source_ttid),
persisted_state,
start_lsn,
true,
)?;
let mut wal_reader = tli.get_walreader(start_lsn).await?;

let mut buf = [0u8; MAX_SEND_SIZE];

Expand Down
38 changes: 20 additions & 18 deletions safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::Arc;
use anyhow::bail;
use anyhow::Result;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use postgres_ffi::MAX_SEND_SIZE;
Expand All @@ -26,7 +27,8 @@ use crate::safekeeper::TermHistory;
use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::wal_storage::WalReader;
use crate::timeline::get_timeline_dir;
use crate::timeline::FullAccessTimeline;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;

Expand Down Expand Up @@ -68,6 +70,7 @@ pub struct Response {
pub struct TimelineDumpSer {
pub tli: Arc<crate::timeline::Timeline>,
pub args: Args,
pub timeline_dir: Utf8PathBuf,
pub runtime: Arc<tokio::runtime::Runtime>,
}

Expand All @@ -85,14 +88,20 @@ impl Serialize for TimelineDumpSer {
where
S: serde::Serializer,
{
let dump = self
.runtime
.block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
let dump = self.runtime.block_on(build_from_tli_dump(
&self.tli,
&self.args,
&self.timeline_dir,
));
dump.serialize(serializer)
}
}

async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
async fn build_from_tli_dump(
timeline: &Arc<crate::timeline::Timeline>,
args: &Args,
timeline_dir: &Utf8Path,
) -> Timeline {
let control_file = if args.dump_control_file {
let mut state = timeline.get_state().await.1;
if !args.dump_term_history {
Expand All @@ -112,7 +121,8 @@ async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Arg
let disk_content = if args.dump_disk_content {
// build_disk_content can fail, but we don't want to fail the whole
// request because of that.
build_disk_content(&timeline.timeline_dir).ok()
// Note: timeline can be in offloaded state, this is not a problem.
build_disk_content(timeline_dir).ok()
} else {
None
};
Expand Down Expand Up @@ -186,6 +196,7 @@ pub struct FileInfo {
pub async fn build(args: Args) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count();
let config = GlobalTimelines::get_global_config();

let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the
Expand Down Expand Up @@ -223,12 +234,11 @@ pub async fn build(args: Args) -> Result<Response> {
timelines.push(TimelineDumpSer {
tli,
args: args.clone(),
timeline_dir: get_timeline_dir(&config, &ttid),
runtime: runtime.clone(),
});
}

let config = GlobalTimelines::get_global_config();

Ok(Response {
start_time,
finish_time: Utc::now(),
Expand Down Expand Up @@ -316,27 +326,19 @@ pub struct TimelineDigest {
}

pub async fn calculate_digest(
tli: &Arc<crate::timeline::Timeline>,
tli: &FullAccessTimeline,
request: TimelineDigestRequest,
) -> Result<TimelineDigest> {
if request.from_lsn > request.until_lsn {
bail!("from_lsn is greater than until_lsn");
}

let conf = GlobalTimelines::get_global_config();
let (_, persisted_state) = tli.get_state().await;

if persisted_state.timeline_start_lsn > request.from_lsn {
bail!("requested LSN is before the start of the timeline");
}

let mut wal_reader = WalReader::new(
conf.workdir.clone(),
tli.timeline_dir.clone(),
&persisted_state,
request.from_lsn,
true,
)?;
let mut wal_reader = tli.get_walreader(request.from_lsn).await?;

let mut hasher = Sha256::new();
let mut buf = [0u8; MAX_SEND_SIZE];
Expand Down
Loading

0 comments on commit e647bb5

Please sign in to comment.