Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evict WAL files from disk #8022

Merged
merged 32 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
274fbc1
Spawn partial backup on demand
petuhovskiy Jun 11, 2024
2a98f64
Implement ManagerCtl
petuhovskiy Jun 12, 2024
619c96a
Add StateSK
petuhovskiy Jun 12, 2024
0abaf14
Fix tests
petuhovskiy Jun 13, 2024
dc394ae
Fix guards usage in manager
petuhovskiy Jun 13, 2024
24227e3
WIP: add test_s3_eviction
petuhovskiy Jun 14, 2024
87c9a65
Implement sk state switch
petuhovskiy Jun 17, 2024
97b5f78
Enable offloading
petuhovskiy Jun 17, 2024
1a2fcab
Enable automatic offloading switch
petuhovskiy Jun 17, 2024
5e5c397
Make eviction mostly work
petuhovskiy Jun 18, 2024
d0e9049
Rebase
petuhovskiy Jun 18, 2024
17e61b2
Restructure code
petuhovskiy Jun 19, 2024
200b302
Fix test
petuhovskiy Jun 19, 2024
e682738
Fix full_access_guard in start_snapshot
petuhovskiy Jun 20, 2024
87bfd3c
Rebase
petuhovskiy Jun 21, 2024
c9cfef4
Add debug status to manager
petuhovskiy Jun 22, 2024
ab8c74d
Fix deadlock in backup task
petuhovskiy Jun 22, 2024
2e82866
Add atomic rename
petuhovskiy Jun 23, 2024
c29454a
Rename FullAccessTimeline to WalResidentTimeline
petuhovskiy Jun 23, 2024
a15f54f
Fix clippy warnings
petuhovskiy Jun 24, 2024
026868e
Update remote_consistent_lsn in offloaded state
petuhovskiy Jun 24, 2024
d226955
Rename timeline_access.rs
petuhovskiy Jun 24, 2024
6bef78a
Fix test_s3_eviction
petuhovskiy Jun 24, 2024
e88a8c7
Add control_file_save_interval
petuhovskiy Jun 24, 2024
02077ad
Add comments
petuhovskiy Jun 24, 2024
63af675
Add comments after self-review
petuhovskiy Jun 24, 2024
a1fb21d
Fix misc
petuhovskiy Jun 24, 2024
1f0ed2e
Fix test flakiness
petuhovskiy Jun 24, 2024
1864721
Revert testing conf
petuhovskiy Jun 24, 2024
8be0efe
Fix review comments
petuhovskiy Jun 25, 2024
4f6feb6
Assert uneviction in test
petuhovskiy Jun 26, 2024
9767942
Fix review comments
petuhovskiy Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use utils::pid_file;

use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::http;
use safekeeper::wal_service;
Expand Down Expand Up @@ -172,6 +172,7 @@ struct Args {
walsenders_keep_horizon: bool,
/// Enable partial backup. If disabled, safekeeper will not upload partial
/// segments to remote storage.
/// TODO: now partial backup is always enabled, remove this flag.
#[arg(long)]
partial_backup_enabled: bool,
/// Controls how long backup will wait until uploading the partial segment.
Expand All @@ -181,6 +182,15 @@ struct Args {
/// be used in tests.
#[arg(long)]
disable_periodic_broker_push: bool,
/// Enable automatic switching to offloaded state.
#[arg(long)]
enable_offload: bool,
petuhovskiy marked this conversation as resolved.
Show resolved Hide resolved
/// Delete local WAL files after offloading. When disabled, they will be left on disk.
#[arg(long)]
delete_offloaded_wal: bool,
/// Pending updates to control file will be automatically saved after this interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
control_file_save_interval: Duration,
}

// Like PathBufValueParser, but allows empty string.
Expand Down Expand Up @@ -328,9 +338,12 @@ async fn main() -> anyhow::Result<()> {
sk_auth_token,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: args.partial_backup_enabled,
partial_backup_enabled: true,
partial_backup_timeout: args.partial_backup_timeout,
disable_periodic_broker_push: args.disable_periodic_broker_push,
enable_offload: args.enable_offload,
delete_offloaded_wal: args.delete_offloaded_wal,
control_file_save_interval: args.control_file_save_interval,
};

// initialize sentry if SENTRY_DSN is provided
Expand Down
5 changes: 4 additions & 1 deletion safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ impl FileStorage {
conf: &SafeKeeperConf,
state: TimelinePersistentState,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));

let store = FileStorage {
timeline_dir,
no_sync: conf.no_sync,
Expand Down Expand Up @@ -103,7 +106,7 @@ impl FileStorage {
}

/// Load control file from given directory.
pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
let path = timeline_dir.join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}
Expand Down
8 changes: 4 additions & 4 deletions safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
state::TimelinePersistentState,
timeline::{FullAccessTimeline, Timeline, TimelineError},
timeline::{Timeline, TimelineError, WalResidentTimeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
}

let source_tli = request.source.full_access_guard().await?;
let source_tli = request.source.wal_residence_guard().await?;

let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
}

async fn copy_disk_segments(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
wal_seg_size: usize,
start_lsn: Lsn,
end_lsn: Lsn,
Expand All @@ -183,7 +183,7 @@ async fn copy_disk_segments(
let copy_end = copy_end - segment_start;

let wal_file_path = {
let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size);

if segment == last_segment {
partial
Expand Down
6 changes: 4 additions & 2 deletions safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::timeline::get_timeline_dir;
use crate::timeline::FullAccessTimeline;
use crate::timeline::WalResidentTimeline;
use crate::timeline_manager;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;

Expand Down Expand Up @@ -168,6 +169,7 @@ pub struct Memory {
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: TimelineMemState,
pub mgr_status: timeline_manager::Status,

// PhysicalStorage state.
pub write_lsn: Lsn,
Expand Down Expand Up @@ -326,7 +328,7 @@ pub struct TimelineDigest {
}

pub async fn calculate_digest(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
request: TimelineDigestRequest,
) -> Result<TimelineDigest> {
if request.from_lsn > request.until_lsn {
Expand Down
8 changes: 4 additions & 4 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
// Note: with evicted timelines it should work better then de-evict them and
// stream; probably start_snapshot would copy partial s3 file to dest path
// and stream control file, or return FullAccessTimeline if timeline is not
// and stream control file, or return WalResidentTimeline if timeline is not
// evicted.
let tli = tli
.full_access_guard()
.wal_residence_guard()
.await
.map_err(ApiError::InternalServerError)?;

Expand Down Expand Up @@ -283,7 +283,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body

let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.full_access_guard()
.wal_residence_guard()
.await
.map_err(ApiError::InternalServerError)?;

Expand All @@ -306,7 +306,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
tli.write_shared_state()
.await
.sk
.state
.state_mut()
.flush()
.await
.map_err(ApiError::InternalServerError)?;
Expand Down
10 changes: 5 additions & 5 deletions safekeeper/src/json_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::safekeeper::{
};
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::FullAccessTimeline;
use crate::timeline::WalResidentTimeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
use postgres_ffi::encode_logical_message;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
async fn prepare_safekeeper(
ttid: TenantTimelineId,
pg_version: u32,
) -> anyhow::Result<FullAccessTimeline> {
) -> anyhow::Result<WalResidentTimeline> {
let tli = GlobalTimelines::create(
ttid,
ServerInfo {
Expand All @@ -115,11 +115,11 @@ async fn prepare_safekeeper(
)
.await?;

tli.full_access_guard().await
tli.wal_residence_guard().await
}

async fn send_proposer_elected(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
term: Term,
lsn: Lsn,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
pub async fn append_logical_message(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
Expand Down
9 changes: 9 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub mod safekeeper;
pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod timeline_eviction;
pub mod timeline_guard;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_backup;
Expand All @@ -49,6 +51,7 @@ pub mod defaults {
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -85,6 +88,9 @@ pub struct SafeKeeperConf {
pub partial_backup_enabled: bool,
pub partial_backup_timeout: Duration,
pub disable_periodic_broker_push: bool,
pub enable_offload: bool,
pub delete_offloaded_wal: bool,
pub control_file_save_interval: Duration,
}

impl SafeKeeperConf {
Expand Down Expand Up @@ -124,6 +130,9 @@ impl SafeKeeperConf {
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
}
}
}
Expand Down
35 changes: 20 additions & 15 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
Expand All @@ -46,7 +46,7 @@ use utils::{

/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
Expand All @@ -66,7 +66,7 @@ pub struct SnapshotContext {
pub flush_lsn: Lsn,
pub wal_seg_size: usize,
// used to remove WAL hold off in Drop.
pub tli: FullAccessTimeline,
pub tli: WalResidentTimeline,
}

impl Drop for SnapshotContext {
Expand All @@ -80,7 +80,7 @@ impl Drop for SnapshotContext {
}

pub async fn stream_snapshot_guts(
tli: FullAccessTimeline,
tli: WalResidentTimeline,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn stream_snapshot_guts(
Ok(())
}

impl FullAccessTimeline {
impl WalResidentTimeline {
/// Start streaming tar archive with timeline:
/// 1) stream control file under lock;
/// 2) hold off WAL removal;
Expand All @@ -160,6 +160,7 @@ impl FullAccessTimeline {
ar: &mut tokio_tar::Builder<W>,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();

let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
Expand All @@ -173,19 +174,19 @@ impl FullAccessTimeline {
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
// won't be removed until we're done.
let from_lsn = min(
shared_state.sk.state.remote_consistent_lsn,
shared_state.sk.state.backup_lsn,
shared_state.sk.state().remote_consistent_lsn,
shared_state.sk.state().backup_lsn,
);
if from_lsn == Lsn::INVALID {
// this is possible if snapshot is called before handling first
// elected message
bail!("snapshot is called on uninitialized timeline");
}
let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let from_segno = from_lsn.segment_number(wal_seg_size);
let term = shared_state.sk.state().acceptor_state.term;
let last_log_term = shared_state.sk.last_log_term();
let flush_lsn = shared_state.sk.flush_lsn();
let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
let upto_segno = flush_lsn.segment_number(wal_seg_size);
// have some limit on max number of segments as a sanity check
const MAX_ALLOWED_SEGS: u64 = 1000;
let num_segs = upto_segno - from_segno + 1;
Expand All @@ -206,14 +207,18 @@ impl FullAccessTimeline {
}
shared_state.wal_removal_on_hold = true;

// Drop shared_state to release the lock, before calling wal_residence_guard().
drop(shared_state);

let tli_copy = self.wal_residence_guard().await?;
let bctx = SnapshotContext {
from_segno,
upto_segno,
term,
last_log_term,
flush_lsn,
wal_seg_size: shared_state.get_wal_seg_size(),
tli: self.clone(),
wal_seg_size,
tli: tli_copy,
};

Ok(bctx)
Expand All @@ -225,8 +230,8 @@ impl FullAccessTimeline {
/// forget this if snapshotting fails mid the way.
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
let shared_state = self.read_shared_state().await;
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let term = shared_state.sk.state().acceptor_state.term;
let last_log_term = shared_state.sk.last_log_term();
// There are some cases to relax this check (e.g. last_log_term might
// change, but as long as older history is strictly part of new that's
// fine), but there is no need to do it.
Expand Down
Loading
Loading