Skip to content

Commit

Permalink
Evict WAL files from disk (#8022)
Browse files Browse the repository at this point in the history
Fixes #6337

Add safekeeper support to switch between `Present` and
`Offloaded(flush_lsn)` states. The offloading is disabled by default,
but can be controlled using new cmdline arguments:

```
      --enable-offload
          Enable automatic switching to offloaded state
      --delete-offloaded-wal
          Delete local WAL files after offloading. When disabled, they will be left on disk
      --control-file-save-interval <CONTROL_FILE_SAVE_INTERVAL>
          Pending updates to control file will be automatically saved after this interval [default: 300s]
```

Manager watches state updates and detects when there are no actvity on
the timeline and actual partial backup upload in remote storage. When
all conditions are met, the state can be switched to offloaded.

In `timeline.rs` there is `StateSK` enum to support switching between
states. When offloaded, code can access only control file structure and
cannot use `SafeKeeper` to accept new WAL.

`FullAccessTimeline` is now renamed to `WalResidentTimeline`. This
struct contains guard to notify manager about active tasks requiring
on-disk WAL access. All guards are issued by the manager, all requests
are sent via channel using `ManagerCtl`. When manager receives request
to issue a guard, it unevicts timeline if it's currently evicted.

Fixed a bug in partial WAL backup, it used `term` instead of
`last_log_term` previously.

After this commit is merged, next step is to roll this change out, as in
issue #6338.
  • Loading branch information
petuhovskiy committed Jun 26, 2024
1 parent dd3adc3 commit 76fc3d4
Show file tree
Hide file tree
Showing 25 changed files with 1,665 additions and 472 deletions.
19 changes: 16 additions & 3 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use utils::pid_file;

use metrics::set_build_info_metric;
use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::http;
use safekeeper::wal_service;
Expand Down Expand Up @@ -172,6 +172,7 @@ struct Args {
walsenders_keep_horizon: bool,
/// Enable partial backup. If disabled, safekeeper will not upload partial
/// segments to remote storage.
/// TODO: now partial backup is always enabled, remove this flag.
#[arg(long)]
partial_backup_enabled: bool,
/// Controls how long backup will wait until uploading the partial segment.
Expand All @@ -181,6 +182,15 @@ struct Args {
/// be used in tests.
#[arg(long)]
disable_periodic_broker_push: bool,
/// Enable automatic switching to offloaded state.
#[arg(long)]
enable_offload: bool,
/// Delete local WAL files after offloading. When disabled, they will be left on disk.
#[arg(long)]
delete_offloaded_wal: bool,
/// Pending updates to control file will be automatically saved after this interval.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
control_file_save_interval: Duration,
}

// Like PathBufValueParser, but allows empty string.
Expand Down Expand Up @@ -328,9 +338,12 @@ async fn main() -> anyhow::Result<()> {
sk_auth_token,
current_thread_runtime: args.current_thread_runtime,
walsenders_keep_horizon: args.walsenders_keep_horizon,
partial_backup_enabled: args.partial_backup_enabled,
partial_backup_enabled: true,
partial_backup_timeout: args.partial_backup_timeout,
disable_periodic_broker_push: args.disable_periodic_broker_push,
enable_offload: args.enable_offload,
delete_offloaded_wal: args.delete_offloaded_wal,
control_file_save_interval: args.control_file_save_interval,
};

// initialize sentry if SENTRY_DSN is provided
Expand Down
5 changes: 4 additions & 1 deletion safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ impl FileStorage {
conf: &SafeKeeperConf,
state: TimelinePersistentState,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));

let store = FileStorage {
timeline_dir,
no_sync: conf.no_sync,
Expand Down Expand Up @@ -103,7 +106,7 @@ impl FileStorage {
}

/// Load control file from given directory.
pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
let path = timeline_dir.join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}
Expand Down
8 changes: 4 additions & 4 deletions safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
state::TimelinePersistentState,
timeline::{FullAccessTimeline, Timeline, TimelineError},
timeline::{Timeline, TimelineError, WalResidentTimeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
}

let source_tli = request.source.full_access_guard().await?;
let source_tli = request.source.wal_residence_guard().await?;

let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
}

async fn copy_disk_segments(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
wal_seg_size: usize,
start_lsn: Lsn,
end_lsn: Lsn,
Expand All @@ -183,7 +183,7 @@ async fn copy_disk_segments(
let copy_end = copy_end - segment_start;

let wal_file_path = {
let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size);

if segment == last_segment {
partial
Expand Down
6 changes: 4 additions & 2 deletions safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::timeline::get_timeline_dir;
use crate::timeline::FullAccessTimeline;
use crate::timeline::WalResidentTimeline;
use crate::timeline_manager;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;

Expand Down Expand Up @@ -168,6 +169,7 @@ pub struct Memory {
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: TimelineMemState,
pub mgr_status: timeline_manager::Status,

// PhysicalStorage state.
pub write_lsn: Lsn,
Expand Down Expand Up @@ -326,7 +328,7 @@ pub struct TimelineDigest {
}

pub async fn calculate_digest(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
request: TimelineDigestRequest,
) -> Result<TimelineDigest> {
if request.from_lsn > request.until_lsn {
Expand Down
8 changes: 4 additions & 4 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
// Note: with evicted timelines it should work better then de-evict them and
// stream; probably start_snapshot would copy partial s3 file to dest path
// and stream control file, or return FullAccessTimeline if timeline is not
// and stream control file, or return WalResidentTimeline if timeline is not
// evicted.
let tli = tli
.full_access_guard()
.wal_residence_guard()
.await
.map_err(ApiError::InternalServerError)?;

Expand Down Expand Up @@ -283,7 +283,7 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body

let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.full_access_guard()
.wal_residence_guard()
.await
.map_err(ApiError::InternalServerError)?;

Expand All @@ -306,7 +306,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
tli.write_shared_state()
.await
.sk
.state
.state_mut()
.flush()
.await
.map_err(ApiError::InternalServerError)?;
Expand Down
10 changes: 5 additions & 5 deletions safekeeper/src/json_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::safekeeper::{
};
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::FullAccessTimeline;
use crate::timeline::WalResidentTimeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
use postgres_ffi::encode_logical_message;
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
async fn prepare_safekeeper(
ttid: TenantTimelineId,
pg_version: u32,
) -> anyhow::Result<FullAccessTimeline> {
) -> anyhow::Result<WalResidentTimeline> {
let tli = GlobalTimelines::create(
ttid,
ServerInfo {
Expand All @@ -115,11 +115,11 @@ async fn prepare_safekeeper(
)
.await?;

tli.full_access_guard().await
tli.wal_residence_guard().await
}

async fn send_proposer_elected(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
term: Term,
lsn: Lsn,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
pub async fn append_logical_message(
tli: &FullAccessTimeline,
tli: &WalResidentTimeline,
msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
Expand Down
9 changes: 9 additions & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub mod safekeeper;
pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod timeline_eviction;
pub mod timeline_guard;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_backup;
Expand All @@ -49,6 +51,7 @@ pub mod defaults {
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -85,6 +88,9 @@ pub struct SafeKeeperConf {
pub partial_backup_enabled: bool,
pub partial_backup_timeout: Duration,
pub disable_periodic_broker_push: bool,
pub enable_offload: bool,
pub delete_offloaded_wal: bool,
pub control_file_save_interval: Duration,
}

impl SafeKeeperConf {
Expand Down Expand Up @@ -124,6 +130,9 @@ impl SafeKeeperConf {
partial_backup_enabled: false,
partial_backup_timeout: Duration::from_secs(0),
disable_periodic_broker_push: false,
enable_offload: false,
delete_offloaded_wal: false,
control_file_save_interval: Duration::from_secs(1),
}
}
}
Expand Down
35 changes: 20 additions & 15 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
routes::TimelineStatus,
},
safekeeper::Term,
timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError},
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
Expand All @@ -46,7 +46,7 @@ use utils::{

/// Stream tar archive of timeline to tx.
#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))]
pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender<Result<Bytes>>) {
pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender<Result<Bytes>>) {
if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
Expand All @@ -66,7 +66,7 @@ pub struct SnapshotContext {
pub flush_lsn: Lsn,
pub wal_seg_size: usize,
// used to remove WAL hold off in Drop.
pub tli: FullAccessTimeline,
pub tli: WalResidentTimeline,
}

impl Drop for SnapshotContext {
Expand All @@ -80,7 +80,7 @@ impl Drop for SnapshotContext {
}

pub async fn stream_snapshot_guts(
tli: FullAccessTimeline,
tli: WalResidentTimeline,
tx: mpsc::Sender<Result<Bytes>>,
) -> Result<()> {
// tokio-tar wants Write implementor, but we have mpsc tx <Result<Bytes>>;
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn stream_snapshot_guts(
Ok(())
}

impl FullAccessTimeline {
impl WalResidentTimeline {
/// Start streaming tar archive with timeline:
/// 1) stream control file under lock;
/// 2) hold off WAL removal;
Expand All @@ -160,6 +160,7 @@ impl FullAccessTimeline {
ar: &mut tokio_tar::Builder<W>,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();

let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
Expand All @@ -173,19 +174,19 @@ impl FullAccessTimeline {
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
// won't be removed until we're done.
let from_lsn = min(
shared_state.sk.state.remote_consistent_lsn,
shared_state.sk.state.backup_lsn,
shared_state.sk.state().remote_consistent_lsn,
shared_state.sk.state().backup_lsn,
);
if from_lsn == Lsn::INVALID {
// this is possible if snapshot is called before handling first
// elected message
bail!("snapshot is called on uninitialized timeline");
}
let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size());
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let from_segno = from_lsn.segment_number(wal_seg_size);
let term = shared_state.sk.state().acceptor_state.term;
let last_log_term = shared_state.sk.last_log_term();
let flush_lsn = shared_state.sk.flush_lsn();
let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size());
let upto_segno = flush_lsn.segment_number(wal_seg_size);
// have some limit on max number of segments as a sanity check
const MAX_ALLOWED_SEGS: u64 = 1000;
let num_segs = upto_segno - from_segno + 1;
Expand All @@ -206,14 +207,18 @@ impl FullAccessTimeline {
}
shared_state.wal_removal_on_hold = true;

// Drop shared_state to release the lock, before calling wal_residence_guard().
drop(shared_state);

let tli_copy = self.wal_residence_guard().await?;
let bctx = SnapshotContext {
from_segno,
upto_segno,
term,
last_log_term,
flush_lsn,
wal_seg_size: shared_state.get_wal_seg_size(),
tli: self.clone(),
wal_seg_size,
tli: tli_copy,
};

Ok(bctx)
Expand All @@ -225,8 +230,8 @@ impl FullAccessTimeline {
/// forget this if snapshotting fails mid the way.
pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> {
let shared_state = self.read_shared_state().await;
let term = shared_state.sk.get_term();
let last_log_term = shared_state.sk.get_last_log_term();
let term = shared_state.sk.state().acceptor_state.term;
let last_log_term = shared_state.sk.last_log_term();
// There are some cases to relax this check (e.g. last_log_term might
// change, but as long as older history is strictly part of new that's
// fine), but there is no need to do it.
Expand Down
Loading

1 comment on commit 76fc3d4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3022 tests run: 2895 passed, 1 failed, 126 skipped (full report)


Failures on Postgres 14

  • test_heavy_write_workload[neon_on-github-actions-selfhosted-10-5-5]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_heavy_write_workload[neon_on-release-pg14-github-actions-selfhosted-10-5-5]"

Code coverage* (full report)

  • functions: 32.6% (6893 of 21124 functions)
  • lines: 50.0% (53927 of 107928 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
76fc3d4 at 2024-06-26T19:32:10.083Z :recycle:

Please sign in to comment.