Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Jul 3, 2024
1 parent 1714217 commit 6bf5532
Showing 1 changed file with 65 additions and 39 deletions.
104 changes: 65 additions & 39 deletions pageserver/src/tenant/storage_layer/merge_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
collections::{binary_heap, BinaryHeap},
};

use bytes::Bytes;
use pageserver_api::key::Key;
use utils::lsn::Lsn;

Expand Down Expand Up @@ -43,13 +42,18 @@ impl LayerIterRef<'_> {
}
}

struct IteratorWrapper<'a> {
enum IteratorWrapper<'a> {
/// The potential next key of the iterator. If the layer is not loaded yet, it will be the start key encoded in the layer file.
/// Otherwise, it is the next key of the real iterator.
peek_next_value: Option<(Key, Lsn, Value)>,
layer: LayerRef<'a>,
ctx: &'a RequestContext,
iter: Option<LayerIterRef<'a>>,
NotLoaded {
ctx: &'a RequestContext,
first_key_lower_bound: (Key, Lsn),
layer: LayerRef<'a>,
},
Loaded {
peeked: Option<(Key, Lsn, Value)>, // None == end
iter: LayerIterRef<'a>,
},
}

impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
Expand All @@ -75,7 +79,8 @@ impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
(Some((k1, l1)), Some((k2, l2))) => {
let loaded_1 = if self.is_loaded() { 1 } else { 0 };
let loaded_2 = if other.is_loaded() { 1 } else { 0 };
// when key_lsn are the same, the unloaded iter will always appear before the loaded one.
// When key_lsn are the same, the unloaded iter will always appear before the loaded one.
// And note that we do a reverse at the end of the comparison, so it works with the max heap.
(k1, l1, loaded_1).cmp(&(k2, l2, loaded_2))
}
(Some(_), None) => Ordering::Less,
Expand All @@ -91,60 +96,80 @@ impl<'a> IteratorWrapper<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
) -> Self {
Self {
peek_next_value: Some((
image_layer.key_range().start,
image_layer.lsn(),
Value::Image(Bytes::new()),
)),
Self::NotLoaded {
layer: LayerRef::Image(image_layer),
first_key_lower_bound: (image_layer.key_range().start, image_layer.lsn()),
ctx,
iter: None,
}
}

pub fn create_from_delta_layer(
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
) -> Self {
Self {
peek_next_value: Some((
delta_layer.key_range().start,
delta_layer.lsn_range().start,
Value::Image(Bytes::new()),
)),
Self::NotLoaded {
layer: LayerRef::Delta(delta_layer),
first_key_lower_bound: (delta_layer.key_range().start, delta_layer.lsn_range().start),
ctx,
iter: None,
}
}

fn peek_next_key_lsn(&self) -> Option<(&Key, Lsn)> {
let Some((key, lsn, _)) = &self.peek_next_value else {
return None;
};
Some((key, *lsn))
match self {
Self::Loaded {
peeked: Some((key, lsn, _)),
..
} => Some((key, *lsn)),
Self::Loaded { peeked: None, .. } => None,
Self::NotLoaded {
first_key_lower_bound: (key, lsn),
..
} => Some((key, *lsn)),
}
}

// CORRECTNESS: this function must always take `&mut self`, never `&self`.
//
// The reason is that `impl Ord for Self` evaluates differently after this function
// returns. We're called through a `PeekMut::deref_mut`, which causes heap repair when
// the PeekMut gets returned. So, it's critical that we actually run through `PeekMut::deref_mut`
// and not just `PeekMut::deref`
// If we don't take `&mut self`
async fn load(&mut self) -> anyhow::Result<()> {
assert!(!self.is_loaded());
let mut iter = self.layer.iter(self.ctx);
self.peek_next_value = iter.next().await?;
self.iter = Some(iter);
let Self::NotLoaded {
ctx,
first_key_lower_bound,
layer,
} = self
else {
unreachable!()
};
let mut iter = layer.iter(ctx);
let peeked = iter.next().await?;
if let Some((k1, l1, _)) = &peeked {
let (k2, l2) = first_key_lower_bound;
debug_assert!((k1, l1) >= (k2, l2));
}
*self = Self::Loaded { peeked, iter };
Ok(())
}

fn is_loaded(&self) -> bool {
self.iter.is_some()
matches!(self, Self::Loaded { .. })
}

/// Correctness: must load the iterator before using.
///
/// Given this iterator wrapper is private to the merge iterator, users won't be able to mis-use it.
/// The public interfaces to use are [`crate::tenant::storage_layer::delta_layer::DeltaLayerIterator`] and
/// [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
if !self.is_loaded() {
self.load().await?;
}
let result = self.peek_next_value.take();
let iter = self.iter.as_mut().unwrap();
self.peek_next_value = iter.next().await?;
let Self::Loaded { peeked, iter } = self else {
panic!("must load the iterator before using")
};
let result = peeked.take();
*peeked = iter.next().await?;
Ok(result)
}
}
Expand All @@ -159,14 +184,16 @@ impl<'a> MergeIterator<'a> {
images: &[&'a ImageLayerInner],
ctx: &'a RequestContext,
) -> Self {
let mut heap = BinaryHeap::with_capacity(images.len() + deltas.len());
let mut heap = Vec::with_capacity(images.len() + deltas.len());
for image in images {
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
}
for delta in deltas {
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
}
Self { heap }
Self {
heap: BinaryHeap::from(heap),
}
}

pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
Expand Down Expand Up @@ -234,8 +261,6 @@ mod tests {
.await
.unwrap();

// TODO: is it possible to have duplicated delta at same LSN now? we might need to test that

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
Expand Down Expand Up @@ -298,4 +323,5 @@ mod tests {
}

// TODO: image layer merge, delta+image mixed merge
// TODO: is it possible to have duplicated delta at same LSN now? we might need to test that
}

0 comments on commit 6bf5532

Please sign in to comment.