diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 137b93055a6d..12882c9d59cf 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -24,7 +24,9 @@ use tracing::{debug, info}; use std::collections::{HashSet, VecDeque}; use std::ops::Range; -use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with}; +use crate::helpers::{ + accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, +}; use crate::interface::*; use utils::lsn::Lsn; @@ -535,7 +537,10 @@ where } } // Open stream - let key_value_stream = std::pin::pin!(merge_delta_keys::(deltas.as_slice(), ctx)); + let key_value_stream = + std::pin::pin!(merge_delta_keys_buffered::(deltas.as_slice(), ctx) + .await? + .map(Result::<_, anyhow::Error>::Ok)); let mut new_jobs = Vec::new(); // Slide a window through the keyspace diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index eb0e5ee82ae5..06454ee1d050 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::ops::{DerefMut, Range}; use std::pin::Pin; use std::task::{ready, Poll}; +use utils::lsn::Lsn; pub fn keyspace_total_size( keyspace: &CompactionKeySpace, @@ -109,17 +110,40 @@ pub fn merge_delta_keys<'a, E: CompactionJobExecutor>( } } +pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>( + layers: &'a [E::DeltaLayer], + ctx: &'a E::RequestContext, +) -> anyhow::Result>::DeltaEntry<'a>>> +{ + let mut keys = Vec::new(); + for l in layers { + // Boxing and casting to LoadFuture is required to obtain the right Sync bound. + // If we do l.load_keys(ctx).await? directly, there is a compilation error. + let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx)); + keys.extend(load_future.await?.into_iter()); + } + keys.sort_by_key(|k| (k.key(), k.lsn())); + let stream = futures::stream::iter(keys.into_iter()); + Ok(stream) +} + enum LazyLoadLayer<'a, E: CompactionJobExecutor> { Loaded(VecDeque<>::DeltaEntry<'a>>), Unloaded(&'a E::DeltaLayer), } impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> { - fn key(&self) -> E::Key { + fn min_key(&self) -> E::Key { match self { Self::Loaded(entries) => entries.front().unwrap().key(), Self::Unloaded(dl) => dl.key_range().start, } } + fn min_lsn(&self) -> Lsn { + match self { + Self::Loaded(entries) => entries.front().unwrap().lsn(), + Self::Unloaded(dl) => dl.lsn_range().start, + } + } } impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { fn partial_cmp(&self, other: &Self) -> Option { @@ -129,12 +153,12 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // reverse order so that we get a min-heap - other.key().cmp(&self.key()) + (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn())) } } impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> { fn eq(&self, other: &Self) -> bool { - self.key().eq(&other.key()) + self.cmp(other) == std::cmp::Ordering::Equal } } impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}