Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Jul 5, 2024
1 parent c27ce4f commit 837065a
Showing 1 changed file with 74 additions and 6 deletions.
80 changes: 74 additions & 6 deletions pageserver/src/tenant/storage_layer/merge_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,19 @@ impl<'a> MergeIterator<'a> {
pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
while let Some(mut iter) = self.heap.peek_mut() {
if !iter.is_loaded() {
// Once we load the iterator, we can know the real first key-value pair in the iterator.
// We put it back into the heap so that a potentially unloaded layer may have a key between
// [potential_first_key, loaded_first_key).
iter.load().await?;
} else {
let res = iter.next().await?;
if res.is_none() {
binary_heap::PeekMut::pop(iter);
}
return Ok(res);
continue;
}
let Some(item) = iter.next().await? else {
// If the iterator returns None, we pop this iterator. Actually, in the current implementation,
// we order None > Some, and all the rest of the iterators should return None.
binary_heap::PeekMut::pop(iter);
continue;
};
return Ok(Some(item));
}
Ok(None)
}
Expand Down Expand Up @@ -265,6 +270,69 @@ mod tests {
}
}

#[tokio::test]
async fn merge_in_between() {
use crate::repository::Value;
use bytes::Bytes;

let harness = TenantHarness::create("merge_iterator_delta_merge").unwrap();
let (tenant, ctx) = harness.load().await;

let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let test_deltas1 = vec![
(
get_key(0),
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"test")),
),
(
get_key(5),
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"test")),
),
];
let resident_layer_1 = produce_delta_layer(&tenant, &tline, test_deltas1.clone(), &ctx)
.await
.unwrap();
let test_deltas2 = vec![
(
get_key(3),
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"test")),
),
(
get_key(4),
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"test")),
),
];
let resident_layer_2 = produce_delta_layer(&tenant, &tline, test_deltas2.clone(), &ctx)
.await
.unwrap();
let mut merge_iter = MergeIterator::create(
&[
resident_layer_2.get_as_delta(&ctx).await.unwrap(),
resident_layer_1.get_as_delta(&ctx).await.unwrap(),
],
&[],
&ctx,
);
let mut expect = Vec::new();
expect.extend(test_deltas1);
expect.extend(test_deltas2);
expect.sort_by(sort_delta);
assert_merge_iter_equal(&mut merge_iter, &expect).await;
}

#[tokio::test]
async fn delta_merge() {
use crate::repository::Value;
Expand Down

0 comments on commit 837065a

Please sign in to comment.