diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 3b3ddd4ee34f..a797937321c0 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -52,18 +52,6 @@ pub(crate) struct SharedBufferVersionedEntryRef<'a> { pub(crate) new_values: &'a [VersionedSharedBufferValue], } -fn values<'a>( - i: usize, - entries: &'a [SharedBufferKeyEntry], - values: &'a [VersionedSharedBufferValue], -) -> &'a [VersionedSharedBufferValue] { - &values[entries[i].value_offset - ..entries - .get(i + 1) - .map(|entry| entry.value_offset) - .unwrap_or(values.len())] -} - #[derive(PartialEq, Debug)] pub(crate) struct SharedBufferKeyEntry { pub(crate) key: TableKey, @@ -74,6 +62,28 @@ pub(crate) struct SharedBufferKeyEntry { pub(crate) value_offset: usize, } +impl SharedBufferKeyEntry { + /// Return an exclusive offset of the values of key of index `i` + fn value_end_offset<'a>( + i: usize, + entries: &'a [SharedBufferKeyEntry], + values: &'a [VersionedSharedBufferValue], + ) -> usize { + entries + .get(i + 1) + .map(|entry| entry.value_offset) + .unwrap_or(values.len()) + } + + fn values<'a>( + i: usize, + entries: &'a [SharedBufferKeyEntry], + values: &'a [VersionedSharedBufferValue], + ) -> &'a [VersionedSharedBufferValue] { + &values[entries[i].value_offset..Self::value_end_offset(i, entries, values)] + } +} + #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { entries: Vec, @@ -122,7 +132,7 @@ impl SharedBufferBatchInner { } pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] { - values(i, &self.entries, &self.new_values) + SharedBufferKeyEntry::values(i, &self.entries, &self.new_values) } #[allow(clippy::too_many_arguments)] @@ -138,10 +148,14 @@ impl SharedBufferBatchInner { assert!(!entries.is_empty()); debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key)); debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset)); - debug_assert!((0..entries.len()).all(|i| values(i, &entries, &new_values) - .iter() - .rev() - .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); + debug_assert!((0..entries.len()).all(|i| SharedBufferKeyEntry::values( + i, + &entries, + &new_values + ) + .iter() + .rev() + .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); debug_assert!(!epochs.is_empty()); debug_assert!(epochs.is_sorted()); @@ -456,9 +470,12 @@ impl SharedBufferBatch { /// If there are multiple versions of a key, the iterator will return all versions pub struct SharedBufferBatchIterator { inner: Arc, - current_value_idx: i32, - // The index of the current entry in the payload - current_idx: usize, + /// The index of the current entry in the payload + current_entry_idx: usize, + /// The index of current value + current_value_idx: usize, + /// The exclusive end offset of the value index of current key. + value_end_offset: usize, table_id: TableId, _phantom: PhantomData, } @@ -467,59 +484,89 @@ impl SharedBufferBatchIterator { pub(crate) fn new(inner: Arc, table_id: TableId) -> Self { Self { inner, - current_idx: 0, + current_entry_idx: 0, current_value_idx: 0, + value_end_offset: 0, table_id, _phantom: Default::default(), } } - /// Return all values of the current key - pub(crate) fn current_values(&self) -> &[VersionedSharedBufferValue] { - debug_assert!(self.current_idx < self.inner.entries.len()); - let idx = match D::direction() { - DirectionEnum::Forward => self.current_idx, - DirectionEnum::Backward => self.inner.entries.len() - self.current_idx - 1, - }; - self.inner.values(idx) + fn is_valid_entry_idx(&self) -> bool { + self.current_entry_idx < self.inner.entries.len() } - fn current_values_len(&self) -> i32 { - if self.current_idx < self.inner.entries.len() { - self.current_values().len() as i32 - } else { - 0 + fn advance_to_next_entry(&mut self) { + debug_assert!(self.is_valid_entry_idx()); + match D::direction() { + DirectionEnum::Forward => { + self.current_entry_idx += 1; + } + DirectionEnum::Backward => { + if self.current_entry_idx == 0 { + self.current_entry_idx = self.inner.entries.len(); + } else { + self.current_entry_idx -= 1; + } + } } } - pub(crate) fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { - let (idx, value_idx) = match D::direction() { - DirectionEnum::Forward => (self.current_idx, self.current_value_idx), - DirectionEnum::Backward => ( - self.inner.entries.len() - self.current_idx - 1, - self.current_value_idx, - ), - }; - let cur_entry = &self.inner.entries[idx]; - ( - &cur_entry.key, - &self.inner.new_values[cur_entry.value_offset + value_idx as usize], + fn reset_value_idx(&mut self) { + debug_assert!(self.is_valid_entry_idx()); + self.current_value_idx = self.inner.entries[self.current_entry_idx].value_offset; + self.value_end_offset = self.get_value_end_offset(); + } + + fn get_value_end_offset(&self) -> usize { + debug_assert!(self.is_valid_entry_idx()); + SharedBufferKeyEntry::value_end_offset( + self.current_entry_idx, + &self.inner.entries, + &self.inner.new_values, ) } + + fn assert_valid_idx(&self) { + debug_assert!(self.is_valid_entry_idx()); + debug_assert!( + self.current_value_idx >= self.inner.entries[self.current_entry_idx].value_offset + ); + debug_assert_eq!(self.value_end_offset, self.get_value_end_offset()); + debug_assert!(self.current_value_idx < self.value_end_offset); + } + + fn advance_to_next_value(&mut self) { + self.assert_valid_idx(); + + if self.current_value_idx + 1 < self.value_end_offset { + self.current_value_idx += 1; + } else { + self.advance_to_next_entry(); + if self.is_valid_entry_idx() { + self.reset_value_idx(); + } + } + } } impl SharedBufferBatchIterator { pub(crate) fn advance_to_next_key(&mut self) { - assert_eq!(self.current_value_idx, 0); - self.current_idx += 1; + self.advance_to_next_entry(); + if self.is_valid_entry_idx() { + self.reset_value_idx(); + } } pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> { - assert!(self.is_valid(), "iterator is not valid"); - assert_eq!(self.current_value_idx, 0); + self.assert_valid_idx(); + debug_assert_eq!( + self.current_value_idx, + self.inner.entries[self.current_entry_idx].value_offset + ); SharedBufferVersionedEntryRef { - key: &self.inner.entries[self.current_idx].key, - new_values: self.inner.values(self.current_idx), + key: &self.inner.entries[self.current_entry_idx].key, + new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset], } } } @@ -528,57 +575,36 @@ impl HummockIterator for SharedBufferBatchIterator< type Direction = D; async fn next(&mut self) -> HummockResult<()> { - assert!(self.is_valid()); - match D::direction() { - DirectionEnum::Forward => { - // If the current key has more versions, we need to advance the value index - if self.current_value_idx + 1 < self.current_values_len() { - self.current_value_idx += 1; - } else { - self.current_idx += 1; - self.current_value_idx = 0; - } - } - DirectionEnum::Backward => { - if self.current_value_idx > 0 { - self.current_value_idx -= 1; - } else { - self.current_idx += 1; - self.current_value_idx = self.current_values_len() - 1; - } - } - } + self.advance_to_next_value(); Ok(()) } fn key(&self) -> FullKey<&[u8]> { - let (key, (epoch_with_gap, _)) = self.current_item(); - FullKey::new_with_gap_epoch(self.table_id, TableKey(key), *epoch_with_gap) + self.assert_valid_idx(); + let key = self.inner.entries[self.current_entry_idx].key.as_ref(); + let epoch_with_gap = self.inner.new_values[self.current_value_idx].0; + FullKey::new_with_gap_epoch(self.table_id, TableKey(key), epoch_with_gap) } fn value(&self) -> HummockValue<&[u8]> { - let (_, (_, value)) = self.current_item(); - value.as_slice() + self.assert_valid_idx(); + self.inner.new_values[self.current_value_idx].1.as_slice() } fn is_valid(&self) -> bool { - if self.current_idx >= self.inner.entries.len() { - return false; - } - self.current_value_idx >= 0 && self.current_value_idx < self.current_values().len() as i32 + self.is_valid_entry_idx() } async fn rewind(&mut self) -> HummockResult<()> { - self.current_idx = 0; - match D::direction() { DirectionEnum::Forward => { - self.current_value_idx = 0; + self.current_entry_idx = 0; } DirectionEnum::Backward => { - self.current_value_idx = self.current_values_len() - 1; + self.current_entry_idx = self.inner.entries.len() - 1; } - } + }; + self.reset_value_idx(); Ok(()) } @@ -591,64 +617,41 @@ impl HummockIterator for SharedBufferBatchIterator< .entries .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key)); let seek_key_epoch = key.epoch_with_gap; - match D::direction() { - DirectionEnum::Forward => match partition_point { - Ok(i) => { - self.current_idx = i; - // seek to the first version that is <= the seek key epoch - let mut idx: i32 = 0; - for (epoch_with_gap, _) in self.current_values() { - if epoch_with_gap <= &seek_key_epoch { - break; - } - idx += 1; - } - - // Move onto the next key for forward iteration if seek key epoch is smaller - // than all versions - if idx >= self.current_values().len() as i32 { - self.current_idx += 1; - self.current_value_idx = 0; - } else { - self.current_value_idx = idx; + match partition_point { + Ok(i) => { + self.current_entry_idx = i; + self.reset_value_idx(); + while self.current_value_idx < self.value_end_offset { + let epoch_with_gap = self.inner.new_values[self.current_value_idx].0; + if epoch_with_gap <= seek_key_epoch { + break; } + self.current_value_idx += 1; } - Err(i) => { - self.current_idx = i; - self.current_value_idx = 0; + if self.current_value_idx == self.value_end_offset { + self.advance_to_next_entry(); + if self.is_valid_entry_idx() { + self.reset_value_idx(); + } } - }, - DirectionEnum::Backward => { - match partition_point { - Ok(i) => { - self.current_idx = self.inner.entries.len() - i - 1; - // seek from back to the first version that is >= seek_key_epoch - let values = self.current_values(); - let mut idx: i32 = (values.len() - 1) as i32; - for (epoch_with_gap, _) in values.iter().rev() { - if epoch_with_gap >= &seek_key_epoch { - break; - } - idx -= 1; - } - - if idx < 0 { - self.current_idx += 1; - self.current_value_idx = self.current_values_len() - 1; - } else { - self.current_value_idx = idx; - } + } + Err(i) => match D::direction() { + DirectionEnum::Forward => { + self.current_entry_idx = i; + if self.is_valid_entry_idx() { + self.reset_value_idx(); } - // Seek to one item before the seek partition_point: - // If i == 0, the iterator will be invalidated with self.current_idx == - // self.inner.len(). - Err(i) => { - self.current_idx = self.inner.entries.len() - i; - self.current_value_idx = self.current_values_len() - 1; + } + DirectionEnum::Backward => { + if i == 0 { + self.current_entry_idx = self.inner.entries.len(); + } else { + self.current_entry_idx = i - 1; + self.reset_value_idx(); } } - } - } + }, + }; Ok(()) } @@ -984,9 +987,9 @@ mod tests { } assert!(!iter.is_valid()); - // BACKWARD: Seek to 2nd key with future epoch, expect first item to return + // BACKWARD: Seek to 2nd key with old epoch, expect first item to return let mut iter = shared_buffer_batch.clone().into_backward_iter(); - iter.seek(iterator_test_key_of_epoch(2, epoch + 1).to_ref()) + iter.seek(iterator_test_key_of_epoch(2, epoch - 1).to_ref()) .await .unwrap(); assert!(iter.is_valid()); @@ -996,9 +999,9 @@ mod tests { iter.next().await.unwrap(); assert!(!iter.is_valid()); - // BACKWARD: Seek to 2nd key with old epoch, expect first two item to return + // BACKWARD: Seek to 2nd key with future epoch, expect first two item to return let mut iter = shared_buffer_batch.clone().into_backward_iter(); - iter.seek(iterator_test_key_of_epoch(2, epoch - 1).to_ref()) + iter.seek(iterator_test_key_of_epoch(2, epoch + 1).to_ref()) .await .unwrap(); for item in shared_buffer_items[0..=1].iter().rev() { @@ -1226,7 +1229,13 @@ mod tests { )); backward_iter.next().await.unwrap(); } - output.reverse(); + let mut expected = vec![]; + for key_idx in (0..=2).rev() { + for epoch in (1..=3).rev() { + let item = batch_items[epoch - 1][key_idx].clone(); + expected.push(item); + } + } assert_eq!(expected, output); } }