Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Jun 18, 2024
1 parent e17925b commit c828cde
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
7 changes: 4 additions & 3 deletions safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl FullAccessTimeline {
// lock and setting `wal_removal_on_hold` later, it guarantees that WAL
// won't be removed until we're done.
let from_lsn = min(
shared_state.sk.state.remote_consistent_lsn,
shared_state.sk.state.backup_lsn,
shared_state.sk.state().remote_consistent_lsn,
shared_state.sk.state().backup_lsn,
);
if from_lsn == Lsn::INVALID {
// this is possible if snapshot is called before handling first
Expand Down Expand Up @@ -206,14 +206,15 @@ impl FullAccessTimeline {
}
shared_state.wal_removal_on_hold = true;

let tli_copy = self.full_access_guard().await?;
let bctx = SnapshotContext {
from_segno,
upto_segno,
term,
last_log_term,
flush_lsn,
wal_seg_size: shared_state.get_wal_seg_size(),
tli: self.clone(),
tli: tli_copy,
};

Ok(bctx)
Expand Down
90 changes: 66 additions & 24 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use anyhow::Context;
use camino::Utf8Path;
use postgres_ffi::XLogSegNo;
use remote_storage::RemotePath;
use tokio::{fs::File, io::{AsyncRead, AsyncWriteExt}, task::{JoinError, JoinHandle}};
use tokio::{
fs::File,
io::{AsyncRead, AsyncWriteExt},
task::{JoinError, JoinHandle},
};
use tracing::{info, info_span, instrument, warn, Instrument};
use utils::lsn::Lsn;

Expand Down Expand Up @@ -451,13 +455,16 @@ async fn offload_timeline(
let (_, partial_segfile) = wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();

if true {
info!("deleting WAL file here: {}, it's being replaced by {:?}", partial_segfile, partial_backup_uploaded);
info!(
"deleting WAL file here: {}, it's being replaced by {:?}",
partial_segfile, partial_backup_uploaded
);
if let Err(e) = tokio::fs::remove_file(&partial_segfile).await {
warn!("failed to delete local WAL file: {:?}", e);
return false;
}
}

info!("offloading timeline at flush_lsn={}", flush_lsn);
if let Err(e) = tli.switch_to_offloaded(&flush_lsn).await {
warn!("failed to offload timeline: {:?}", e);
Expand All @@ -481,15 +488,20 @@ async fn unoffload_timeline(

let flush_lsn = partial_backup_uploaded.flush_lsn;
let segno = flush_lsn.segment_number(wal_seg_size);
let (_, local_partial_segfile) = wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();
let (_, local_partial_segfile) =
wal_file_paths(tli.timeline_dir(), segno, wal_seg_size).unwrap();

let remote_timeline_path = wal_backup::remote_timeline_path(&tli.ttid).expect("TODO");
let remote_segment_path = remote_timeline_path.join(&partial_backup_uploaded.name);

info!("validating local WAL file: {}, compare it with {:?}", local_partial_segfile, partial_backup_uploaded);
info!(
"validating local WAL file: {}, compare it with {:?}",
local_partial_segfile, partial_backup_uploaded
);
match File::open(&local_partial_segfile).await {
Ok(mut local_file) => {
let res = validate_local_segment(&mut local_file, &remote_segment_path, wal_seg_size).await;
let res =
validate_local_segment(&mut local_file, &remote_segment_path, wal_seg_size).await;
match res {
Ok(_) => {
info!("local WAL file is valid: {}", local_partial_segfile);
Expand All @@ -501,10 +513,18 @@ async fn unoffload_timeline(
}
}
Err(_) => {
let res = redownload_partial_segment(&local_partial_segfile, &remote_segment_path, wal_seg_size).await;
let res = redownload_partial_segment(
&local_partial_segfile,
&remote_segment_path,
wal_seg_size,
)
.await;
match res {
Ok(_) => {
info!("successfully redownloaded partial segment: {}", local_partial_segfile);
info!(
"successfully redownloaded partial segment: {}",
local_partial_segfile
);
}
Err(e) => {
warn!("failed to redownload partial segment: {:?}", e);
Expand All @@ -531,10 +551,15 @@ async fn validate_local_segment(
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
anyhow::bail!("local segment size mismatch: {} != {}", local_size, wal_seg_size);
anyhow::bail!(
"local segment size mismatch: {} != {}",
local_size,
wal_seg_size
);
}

let reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> = wal_backup::read_object(remote_segfile, 0).await?;
let reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(remote_segfile, 0).await?;
// we need to compare bytes from both local and remote readers
compare_async_read(reader, file).await?;

Expand All @@ -558,43 +583,54 @@ where
let mut offset = 0;

loop {
let bytes_read1 = reader1.read(&mut buffer1).await.with_context(|| {
format!("failed to read from reader1 at offset {}", offset)
})?;
let bytes_read1 = reader1
.read(&mut buffer1)
.await
.with_context(|| format!("failed to read from reader1 at offset {}", offset))?;
if bytes_read1 == 0 {
break;
}

let bytes_read2 = reader2.read_exact(&mut buffer2[..bytes_read1]).await.with_context(|| {
format!("failed to read {} bytes from reader2 at offset {}", bytes_read1, offset)
})?;
let bytes_read2 = reader2
.read_exact(&mut buffer2[..bytes_read1])
.await
.with_context(|| {
format!(
"failed to read {} bytes from reader2 at offset {}",
bytes_read1, offset
)
})?;
if bytes_read1 != bytes_read2 {
anyhow::bail!("unexpected EOF, unreachable");
}

if &buffer1[..bytes_read1] != &buffer2[..bytes_read2] {
if buffer1[..bytes_read1] != buffer2[..bytes_read2] {
let diff_offset = buffer1[..bytes_read1]
.iter()
.zip(buffer2[..bytes_read2].iter())
.position(|(a, b)| a != b)
.expect("mismatched buffers, but no difference found");
anyhow::bail!("mismatch at offset {}", offset+diff_offset);
anyhow::bail!("mismatch at offset {}", offset + diff_offset);
}

offset += bytes_read1;
}

// check that the rest of reader2 is zeroed
loop {
let bytes_read = reader2.read(&mut buffer2).await.with_context(|| {
format!("failed to read from reader2 at offset {}", offset)
})?;
let bytes_read = reader2
.read(&mut buffer2)
.await
.with_context(|| format!("failed to read from reader2 at offset {}", offset))?;
if bytes_read == 0 {
break;
}

if buffer2[..bytes_read].iter().any(|&b| b != 0) {
anyhow::bail!("unexpected non-zero byte, expected all bytse to be zero after offset {}", offset);
anyhow::bail!(
"unexpected non-zero byte, expected all bytse to be zero after offset {}",
offset
);
}
}

Expand All @@ -606,13 +642,19 @@ async fn redownload_partial_segment(
remote_segfile: &RemotePath,
wal_seg_size: usize,
) -> anyhow::Result<()> {
info!("redownloading partial segment: {} -> {}", remote_segfile, local_segfile);
info!(
"redownloading partial segment: {} -> {}",
remote_segfile, local_segfile
);

let mut reader = wal_backup::read_object(remote_segfile, 0).await?;
let mut file = File::create(local_segfile).await?;

let plen = tokio::io::copy(&mut reader, &mut file).await?;
info!("downloaded {} bytes, resizing the file to wal_seg_size = {}", plen, wal_seg_size);
info!(
"downloaded {} bytes, resizing the file to wal_seg_size = {}",
plen, wal_seg_size
);
assert!(plen <= wal_seg_size as u64);
file.set_len(wal_seg_size as u64).await?;
file.flush().await?;
Expand Down

0 comments on commit c828cde

Please sign in to comment.