Skip to content

Commit

Permalink
refactor: Timeline layer flushing (#7993)
Browse files Browse the repository at this point in the history
The new features have deteriorated layer flushing, most recently with
#7927. Changes:

- inline `Timeline::freeze_inmem_layer` to the only caller
- carry the TimelineWriterState guard to the actual point of freezing
the layer
- this allows us to `#[cfg(feature = "testing")]` the assertion added in
#7927
- remove duplicate `flush_frozen_layer` in favor of splitting the
`flush_frozen_layers_and_wait`
- this requires starting the flush loop earlier for `checkpoint_distance
< initdb size` tests
  • Loading branch information
koivunej authored Jun 10, 2024
1 parent a8ca7a1 commit e466927
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 74 deletions.
12 changes: 6 additions & 6 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3395,6 +3395,12 @@ impl Tenant {
let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();

import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
Expand All @@ -3406,12 +3412,6 @@ impl Tenant {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();

fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
Expand Down
136 changes: 74 additions & 62 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,15 @@ impl Timeline {
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let to_lsn = self.freeze_inmem_layer(false).await;
let to_lsn = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;

let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await;
to_lsn
};
self.flush_frozen_layers_and_wait(to_lsn).await
}

Expand Down Expand Up @@ -1657,25 +1665,35 @@ impl Timeline {
self.last_freeze_at.load(),
open_layer.get_opened_at(),
) {
match open_layer.info() {
let at_lsn = match open_layer.info() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
// We may reach this point if the layer was already frozen by not yet flushed: flushing
// happens asynchronously in the background.
tracing::debug!(
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
);
None
}
InMemoryLayerInfo::Open { .. } => {
// Upgrade to a write lock and freeze the layer
drop(layers_guard);
let mut layers_guard = self.layers.write().await;
layers_guard
.try_freeze_in_memory_layer(current_lsn, &self.last_freeze_at)
let froze = layers_guard
.try_freeze_in_memory_layer(
current_lsn,
&self.last_freeze_at,
&mut write_guard,
)
.await;
Some(current_lsn).filter(|_| froze)
}
};
if let Some(lsn) = at_lsn {
let res: Result<u64, _> = self.flush_frozen_layers(lsn);
if let Err(e) = res {
tracing::info!("failed to flush frozen layer after background freeze: {e:#}");
}
}
write_guard.take();
self.flush_frozen_layers();
}
}

Expand Down Expand Up @@ -2384,7 +2402,7 @@ impl Timeline {
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
assert!(matches!(*flush_loop_state, FlushLoopState::Running{..}));
*flush_loop_state = FlushLoopState::Exited;
Ok(())
}
Expand Down Expand Up @@ -3647,31 +3665,21 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}

/// Whether there was a layer to freeze or not, return the value of get_last_record_lsn
/// before we attempted the freeze: this guarantees that ingested data is frozen up to this lsn (inclusive).
async fn freeze_inmem_layer(&self, write_lock_held: bool) -> Lsn {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.

let _write_guard = if write_lock_held {
None
} else {
let mut g = self.write_lock.lock().await;
// remove the reference to an open layer
g.take();
Some(g)
async fn freeze_inmem_layer_at(
&self,
at: Lsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) {
let frozen = {
let mut guard = self.layers.write().await;
guard
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock)
.await
};

let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn).await;
to_lsn
}

async fn freeze_inmem_layer_at(&self, at: Lsn) {
let mut guard = self.layers.write().await;
guard
.try_freeze_in_memory_layer(at, &self.last_freeze_at)
.await;
if frozen {
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
}
}

/// Layer flusher task's main loop.
Expand Down Expand Up @@ -3765,18 +3773,14 @@ impl Timeline {
}
}

/// Request the flush loop to write out all frozen layers up to `to_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer`].
/// Request the flush loop to write out all frozen layers up to `at_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer_at`].
///
/// `last_record_lsn` may be higher than the highest LSN of a frozen layer: if this is the case,
/// it means no data will be written between the top of the highest frozen layer and to_lsn,
/// e.g. because this tenant shard has ingested up to to_lsn and not written any data locally for that part of the WAL.
async fn flush_frozen_layers_and_wait(
&self,
last_record_lsn: Lsn,
) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();

/// `at_lsn` may be higher than the highest LSN of a frozen layer: if this is the
/// case, it means no data will be written between the top of the highest frozen layer and
/// to_lsn, e.g. because this tenant shard has ingested up to to_lsn and not written any data
/// locally for that part of the WAL.
fn flush_frozen_layers(&self, at_lsn: Lsn) -> Result<u64, FlushLayerError> {
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
Expand All @@ -3791,13 +3795,18 @@ impl Timeline {
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(last_record_lsn, *lsn);
*lsn = std::cmp::max(at_lsn, *lsn);
});

Ok(my_flush_request)
}

async fn wait_flush_completion(&self, request: u64) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();
loop {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= my_flush_request {
if *last_result_counter >= request {
if let Err(err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
Expand All @@ -3824,12 +3833,9 @@ impl Timeline {
}
}

fn flush_frozen_layers(&self) {
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
*counter += 1;

*lsn = std::cmp::max(*lsn, Lsn(self.last_freeze_at.load().0 - 1));
});
async fn flush_frozen_layers_and_wait(&self, at_lsn: Lsn) -> Result<(), FlushLayerError> {
let token = self.flush_frozen_layers(at_lsn)?;
self.wait_flush_completion(token).await
}

/// Flush one frozen in-memory layer to disk, as a new delta layer.
Expand Down Expand Up @@ -5672,16 +5678,15 @@ impl<'a> TimelineWriter<'a> {
}

async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
assert!(self.write_guard.is_some());

self.tl.freeze_inmem_layer_at(freeze_at).await;
let current_size = self.write_guard.as_ref().unwrap().current_size;

let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
// self.write_guard will be taken by the freezing
self.tl
.freeze_inmem_layer_at(freeze_at, &mut self.write_guard)
.await;

self.tl.flush_frozen_layers();
self.tl.flush_frozen_layers(freeze_at)?;

let current_size = self.write_guard.as_ref().unwrap().current_size;
if current_size >= self.get_checkpoint_distance() * 2 {
warn!("Flushed oversized open layer with size {}", current_size)
}
Expand All @@ -5695,20 +5700,27 @@ impl<'a> TimelineWriter<'a> {
return OpenLayerAction::Open;
};

#[cfg(feature = "testing")]
if state.cached_last_freeze_at < self.tl.last_freeze_at.load() {
// TODO(#7993): branch is needed before refactoring the many places of freezing for the
// possibility `state` having a "dangling" reference to an already frozen in-memory
// layer.
// this check and assertion are not really needed because
// LayerManager::try_freeze_in_memory_layer will always clear out the
// TimelineWriterState if something is frozen. however, we can advance last_freeze_at when there
// is no TimelineWriterState.
assert!(
state.open_layer.end_lsn.get().is_some(),
"our open_layer must be outdated"
);
return OpenLayerAction::Open;

// this would be a memory leak waiting to happen because the in-memory layer always has
// an index
panic!("BUG: TimelineWriterState held on to frozen in-memory layer.");
}

if state.prev_lsn == Some(lsn) {
// Rolling mid LSN is not supported by downstream code.
// Rolling mid LSN is not supported by [downstream code].
// Hence, only roll at LSN boundaries.
//
// [downstream code]: https://github.com/neondatabase/neon/pull/7993#discussion_r1633345422
return OpenLayerAction::None;
}

Expand Down
30 changes: 24 additions & 6 deletions pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::{
},
};

use super::TimelineWriterState;

/// Provides semantic APIs to manipulate the layer map.
#[derive(Default)]
pub(crate) struct LayerManager {
Expand Down Expand Up @@ -120,30 +122,46 @@ impl LayerManager {
Ok(layer)
}

/// Called from `freeze_inmem_layer`, returns true if successfully frozen.
pub(crate) async fn try_freeze_in_memory_layer(
/// Tries to freeze an open layer and also manages clearing the TimelineWriterState.
///
/// Returns true if anything was frozen.
pub(super) async fn try_freeze_in_memory_layer(
&mut self,
lsn: Lsn,
last_freeze_at: &AtomicLsn,
) {
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) -> bool {
let Lsn(last_record_lsn) = lsn;
let end_lsn = Lsn(last_record_lsn + 1);

if let Some(open_layer) = &self.layer_map.open_layer {
let froze = if let Some(open_layer) = &self.layer_map.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
open_layer.freeze(end_lsn).await;

// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
self.layer_map.frozen_layers.push_back(open_layer_rc);
self.layer_map.open_layer = None;
self.layer_map.next_open_layer_at = Some(end_lsn);
}

true
} else {
false
};

// Even if there was no layer to freeze, advance last_freeze_at to last_record_lsn+1: this
// accounts for regions in the LSN range where we might have ingested no data due to sharding.
last_freeze_at.store(end_lsn);

// the writer state must no longer have a reference to the frozen layer
let taken = write_lock.take();
assert_eq!(
froze,
taken.is_some(),
"should only had frozen a layer when TimelineWriterState existed"
);

froze
}

/// Add image layers to the layer map, called from `create_image_layers`.
Expand Down

1 comment on commit e466927

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3286 tests run: 3131 passed, 0 failed, 155 skipped (full report)


Code coverage* (full report)

  • functions: 31.5% (6607 of 20962 functions)
  • lines: 48.5% (51101 of 105428 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
e466927 at 2024-06-10T17:51:32.652Z :recycle:

Please sign in to comment.