Skip to content

Commit

Permalink
Make eviction mostly work
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 18, 2024
1 parent 926acf2 commit 4e6173c
Showing 1 changed file with 141 additions and 4 deletions.
145 changes: 141 additions & 4 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use std::{
time::{Duration, Instant},
};

use anyhow::Context;
use camino::Utf8Path;
use postgres_ffi::XLogSegNo;
use tokio::task::{JoinError, JoinHandle};
use remote_storage::RemotePath;
use tokio::{fs::File, io::{AsyncRead, AsyncWriteExt}, task::{JoinError, JoinHandle}};
use tracing::{info, info_span, instrument, warn, Instrument};
use utils::lsn::Lsn;

Expand Down Expand Up @@ -445,7 +448,14 @@ async fn offload_timeline(
let segno = flush_lsn.segment_number(wal_seg_size);
let (_, partial_segfile) = wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();

info!("TODO: delete WAL file here: {}", partial_segfile);
if true {
info!("deleting WAL file here: {}, it's being replaced by {:?}", partial_segfile, partial_backup_uploaded);
if let Err(e) = tokio::fs::remove_file(&partial_segfile).await {
warn!("failed to delete local WAL file: {:?}", e);
return false;
}
}

info!("offloading timeline at flush_lsn={}", flush_lsn);
if let Err(e) = tli.switch_to_offloaded(&flush_lsn).await {
warn!("failed to offload timeline: {:?}", e);
Expand All @@ -469,9 +479,39 @@ async fn unoffload_timeline(

let flush_lsn = partial_backup_uploaded.flush_lsn;
let segno = flush_lsn.segment_number(wal_seg_size);
let (_, partial_segfile) = wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();
let (_, local_partial_segfile) = wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();

let remote_timeline_path = wal_backup::remote_timeline_path(&tli.ttid).expect("TODO");
let remote_segment_path = remote_timeline_path.join(&partial_backup_uploaded.name);

info!("validating local WAL file: {}, compare it with {:?}", local_partial_segfile, partial_backup_uploaded);
match File::open(&local_partial_segfile).await {
Ok(mut local_file) => {
let res = validate_local_segment(&mut local_file, &remote_segment_path, wal_seg_size).await;
match res {
Ok(_) => {
info!("local WAL file is valid: {}", local_partial_segfile);
}
Err(e) => {
warn!("local WAL file is invalid: {}", e);
return false;
}
}
}
Err(_) => {
let res = redownload_partial_segment(&local_partial_segfile, &remote_segment_path, wal_seg_size).await;
match res {
Ok(_) => {
info!("successfully redownloaded partial segment: {}", local_partial_segfile);
}
Err(e) => {
warn!("failed to redownload partial segment: {:?}", e);
return false;
}
}
}
}

info!("TODO: validate local WAL file: {}", partial_segfile);
if let Err(e) = tli.switch_to_present().await {
warn!("failed to unoffload timeline: {:?}", e);
return false;
Expand All @@ -482,6 +522,103 @@ async fn unoffload_timeline(
true
}

async fn validate_local_segment(
file: &mut File,
remote_segfile: &RemotePath,
wal_seg_size: usize,
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
anyhow::bail!("local segment size mismatch: {} != {}", local_size, wal_seg_size);
}

let reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> = wal_backup::read_object(remote_segfile, 0).await?;
// we need to compare bytes from both local and remote readers
compare_async_read(reader, file).await?;

Ok(())
}

/// Compare two readers and return true if bytes from reader1 are a prefix of bytes from reader2.
/// Also checks that last bytes of reader2 are zeroed.
async fn compare_async_read<R1, R2>(mut reader1: R1, mut reader2: R2) -> anyhow::Result<()>
where
R1: AsyncRead + Unpin,
R2: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;

const BUF_SIZE: usize = 32 * 1024;

let mut buffer1 = [0u8; BUF_SIZE];
let mut buffer2 = [0u8; BUF_SIZE];

let mut offset = 0;

loop {
let bytes_read1 = reader1.read(&mut buffer1).await.with_context(|| {
format!("failed to read from reader1 at offset {}", offset)
})?;
if bytes_read1 == 0 {
break;
}

let bytes_read2 = reader2.read_exact(&mut buffer2[..bytes_read1]).await.with_context(|| {
format!("failed to read {} bytes from reader2 at offset {}", bytes_read1, offset)
})?;
if bytes_read1 != bytes_read2 {
anyhow::bail!("unexpected EOF, unreachable");
}

if &buffer1[..bytes_read1] != &buffer2[..bytes_read2] {
let diff_offset = buffer1[..bytes_read1]
.iter()
.zip(buffer2[..bytes_read2].iter())
.position(|(a, b)| a != b)
.expect("mismatched buffers, but no difference found");
anyhow::bail!("mismatch at offset {}", offset+diff_offset);
}

offset += bytes_read1;
}

// check that the rest of reader2 is zeroed
loop {
let bytes_read = reader2.read(&mut buffer2).await.with_context(|| {
format!("failed to read from reader2 at offset {}", offset)
})?;
if bytes_read == 0 {
break;
}

if buffer2[..bytes_read].iter().any(|&b| b != 0) {
anyhow::bail!("unexpected non-zero byte, expected all bytse to be zero after offset {}", offset);
}
}

Ok(())
}

async fn redownload_partial_segment(
local_segfile: &Utf8Path,
remote_segfile: &RemotePath,
wal_seg_size: usize,
) -> anyhow::Result<()> {
info!("redownloading partial segment: {} -> {}", remote_segfile, local_segfile);

let mut reader = wal_backup::read_object(remote_segfile, 0).await?;
let mut file = File::create(local_segfile).await?;

let plen = tokio::io::copy(&mut reader, &mut file).await?;
info!("downloaded {} bytes, resizing the file to wal_seg_size = {}", plen, wal_seg_size);
assert!(plen <= wal_seg_size as u64);
file.set_len(wal_seg_size as u64).await?;
file.flush().await?;
file.sync_all().await?;

Ok(())
}

// WARN: can be used only if timeline is not evicted
fn create_guard(
next_guard_id: &mut u64,
Expand Down

0 comments on commit 4e6173c

Please sign in to comment.