diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index bf5d9249ebb5..c2d4a2776b1d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1492,6 +1492,24 @@ impl DeltaLayerInner { ); offset } + + #[cfg(test)] + pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> { + let block_reader = FileBlockReader::new(&self.file, self.file_id); + let tree_reader = + DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); + DeltaLayerIterator { + delta_layer: self, + ctx, + index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx), + key_values_batch: std::collections::VecDeque::new(), + is_end: false, + planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner::new( + 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer. + 1024, // The default value. Unit tests might use a different value + ), + } + } } /// A set of data associated with a delta layer key and its value @@ -1551,6 +1569,70 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del } } +#[cfg(test)] +pub struct DeltaLayerIterator<'a> { + delta_layer: &'a DeltaLayerInner, + ctx: &'a RequestContext, + planner: crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner, + index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>, + key_values_batch: std::collections::VecDeque<(Key, Lsn, Value)>, + is_end: bool, +} + +#[cfg(test)] +impl<'a> DeltaLayerIterator<'a> { + /// Retrieve a batch of key-value pairs into the iterator buffer. + async fn next_batch(&mut self) -> anyhow::Result<()> { + assert!(self.key_values_batch.is_empty()); + assert!(!self.is_end); + + let plan = loop { + if let Some(res) = self.index_iter.next().await { + let (raw_key, value) = res?; + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); + let blob_ref = BlobRef(value); + let offset = blob_ref.pos(); + if let Some(batch_plan) = self.planner.handle(key, lsn, offset, BlobFlag::None) { + break batch_plan; + } + } else { + self.is_end = true; + let data_end_offset = self.delta_layer.index_start_offset(); + break self.planner.handle_range_end(data_end_offset); + } + }; + let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); + let mut next_batch = std::collections::VecDeque::new(); + let buf_size = plan.size(); + let buf = BytesMut::with_capacity(buf_size); + let blobs_buf = vectored_blob_reader + .read_blobs(&plan, buf, self.ctx) + .await?; + let frozen_buf = blobs_buf.buf.freeze(); + for meta in blobs_buf.blobs.iter() { + let value = Value::des(&frozen_buf[meta.start..meta.end])?; + next_batch.push_back((meta.meta.key, meta.meta.lsn, value)); + } + self.key_values_batch = next_batch; + Ok(()) + } + + pub async fn next(&mut self) -> anyhow::Result> { + if self.key_values_batch.is_empty() { + if self.is_end { + return Ok(None); + } + self.next_batch().await?; + } + Ok(Some( + self.key_values_batch + .pop_front() + .expect("should not be empty"), + )) + } +} + #[cfg(test)] mod test { use std::collections::BTreeMap; @@ -1560,6 +1642,9 @@ mod test { use rand::RngCore; use super::*; + use crate::tenant::harness::TIMELINE_ID; + use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; + use crate::tenant::Tenant; use crate::{ context::DownloadBehavior, task_mgr::TaskKind, @@ -2126,4 +2211,116 @@ mod test { assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right)); } } + + async fn produce_delta_layer( + tenant: &Tenant, + tline: &Arc, + mut deltas: Vec<(Key, Lsn, Value)>, + ctx: &RequestContext, + ) -> anyhow::Result { + deltas.sort_by(|(k1, l1, _), (k2, l2, _)| (k1, l1).cmp(&(k2, l2))); + let (key_start, _, _) = deltas.first().unwrap(); + let (key_max, _, _) = deltas.first().unwrap(); + let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap(); + let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap(); + let lsn_end = Lsn(lsn_max.0 + 1); + let mut writer = DeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + *key_start, + (*lsn_min)..lsn_end, + ctx, + ) + .await?; + let key_end = key_max.next(); + + for (key, lsn, value) in deltas { + writer.put_value(key, lsn, value, ctx).await?; + } + let delta_layer = writer.finish(key_end, tline, ctx).await?; + + Ok::<_, anyhow::Error>(delta_layer) + } + + async fn assert_delta_iter_equal( + delta_iter: &mut DeltaLayerIterator<'_>, + expect: &[(Key, Lsn, Value)], + ) { + let mut expect_iter = expect.iter(); + loop { + let o1 = delta_iter.next().await.unwrap(); + let o2 = expect_iter.next(); + assert_eq!(o1.is_some(), o2.is_some()); + if o1.is_none() && o2.is_none() { + break; + } + let (k1, l1, v1) = o1.unwrap(); + let (k2, l2, v2) = o2.unwrap(); + assert_eq!(&k1, k2); + assert_eq!(l1, *l2); + assert_eq!(&v1, v2); + } + } + + #[tokio::test] + async fn delta_layer_iterator() { + use crate::repository::Value; + use bytes::Bytes; + + let harness = TenantHarness::create("delta_layer_iterator").unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + const N: usize = 1000; + let test_deltas = (0..N) + .map(|idx| { + ( + get_key(idx as u32 / 10), + Lsn(0x10 * ((idx as u64) % 10 + 1)), + Value::Image(Bytes::from(format!("img{idx:05}"))), + ) + }) + .collect_vec(); + let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx) + .await + .unwrap(); + let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap(); + for max_read_size in [1, 1024] { + for batch_size in [1, 2, 4, 8, 3, 7, 13] { + println!("running with batch_size={batch_size} max_read_size={max_read_size}"); + // Test if the batch size is correctly determined + let mut iter = delta_layer.iter(&ctx); + iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + let mut num_items = 0; + for _ in 0..3 { + iter.next_batch().await.unwrap(); + num_items += iter.key_values_batch.len(); + if max_read_size == 1 { + // every key should be a batch b/c the value is larger than max_read_size + assert_eq!(iter.key_values_batch.len(), 1); + } else { + assert_eq!(iter.key_values_batch.len(), batch_size); + } + if num_items >= N { + break; + } + iter.key_values_batch.clear(); + } + // Test if the result is correct + let mut iter = delta_layer.iter(&ctx); + iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size); + assert_delta_iter_equal(&mut iter, &test_deltas).await; + } + } + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1175b750179d..8dd0a23f4637 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5481,12 +5481,12 @@ impl Timeline { } images.sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb)); let min_key = *images.first().map(|(k, _)| k).unwrap(); - let max_key = images.last().map(|(k, _)| k).unwrap().next(); + let end_key = images.last().map(|(k, _)| k).unwrap().next(); let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, self.tenant_shard_id, - &(min_key..max_key), + &(min_key..end_key), lsn, ctx, ) @@ -5518,7 +5518,7 @@ impl Timeline { let last_record_lsn = self.get_last_record_lsn(); deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb))); let min_key = *deltas.first().map(|(k, _, _)| k).unwrap(); - let max_key = deltas.last().map(|(k, _, _)| k).unwrap().next(); + let end_key = deltas.last().map(|(k, _, _)| k).unwrap().next(); let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap(); let max_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap(); assert!( @@ -5541,7 +5541,7 @@ impl Timeline { for (key, lsn, val) in deltas { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } - let delta_layer = delta_layer_writer.finish(max_key, self, ctx).await?; + let delta_layer = delta_layer_writer.finish(end_key, self, ctx).await?; { let mut guard = self.layers.write().await;