From 3400c8a01fb85025c792211bbc5ccf4591f48870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 8 May 2024 19:04:17 +0200 Subject: [PATCH 1/6] Also take lsn into account --- pageserver/compaction/src/helpers.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index eb0e5ee82ae5..a969780e340c 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::ops::{DerefMut, Range}; use std::pin::Pin; use std::task::{ready, Poll}; +use utils::lsn::Lsn; pub fn keyspace_total_size( keyspace: &CompactionKeySpace, @@ -120,6 +121,12 @@ impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> { Self::Unloaded(dl) => dl.key_range().start, } } + fn lsn(&self) -> Lsn { + match self { + Self::Loaded(entries) => entries.front().unwrap().lsn(), + Self::Unloaded(dl) => dl.lsn_range().start, + } + } } impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { fn partial_cmp(&self, other: &Self) -> Option { @@ -129,12 +136,12 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // reverse order so that we get a min-heap - other.key().cmp(&self.key()) + (other.key(), other.lsn()).cmp(&(self.key(), self.lsn())) } } impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> { fn eq(&self, other: &Self) -> bool { - self.key().eq(&other.key()) + self.cmp(other) == std::cmp::Ordering::Equal } } impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {} From ae755931fc9ced4c3abdb9d8ab66135ca04d27d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 8 May 2024 19:07:41 +0200 Subject: [PATCH 2/6] Don't mutate LazyLoadLayer inside the heap --- pageserver/compaction/src/helpers.rs | 38 +++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index a969780e340c..e8447e12fa31 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -11,7 +11,7 @@ use std::collections::BinaryHeap; use std::collections::VecDeque; use std::fmt::Display; use std::future::Future; -use std::ops::{DerefMut, Range}; +use std::ops::Range; use std::pin::Pin; use std::task::{ready, Poll}; use utils::lsn::Lsn; @@ -178,8 +178,8 @@ where match ready!(load_future.as_mut().poll(cx)) { Ok(entries) => { this.load_future.set(None); - *this.heap.peek_mut().unwrap() = - LazyLoadLayer::Loaded(VecDeque::from(entries)); + this.heap + .push(LazyLoadLayer::Loaded(VecDeque::from(entries))); } Err(e) => { return Poll::Ready(Some(Err(e))); @@ -191,23 +191,25 @@ where // loading it. Otherwise return the next entry from it and update // the layer's position in the heap (this decreaseKey operation is // performed implicitly when `top` is dropped). - if let Some(mut top) = this.heap.peek_mut() { - match top.deref_mut() { - LazyLoadLayer::Unloaded(ref mut l) => { - let fut = l.load_keys(this.ctx); - this.load_future.set(Some(Box::pin(fut))); - continue; - } - LazyLoadLayer::Loaded(ref mut entries) => { - let result = entries.pop_front().unwrap(); - if entries.is_empty() { - std::collections::binary_heap::PeekMut::pop(top); - } - return Poll::Ready(Some(Ok(result))); + // We have to remove the layer from the heap and then re-add it, + // because loading it (or just removing a key from it) can change + // its ordering relative to the other layers in the heap. + let Some(mut top) = this.heap.pop() else { + return Poll::Ready(None); + }; + match top { + LazyLoadLayer::Unloaded(ref mut l) => { + let fut = l.load_keys(this.ctx); + this.load_future.set(Some(Box::pin(fut))); + continue; + } + LazyLoadLayer::Loaded(ref mut entries) => { + let result = entries.pop_front().unwrap(); + if !entries.is_empty() { + this.heap.push(top); } + return Poll::Ready(Some(Ok(result))); } - } else { - return Poll::Ready(None); } } } From 57b69346b54be735a1a74115fd4833cae6291169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 8 May 2024 19:02:53 +0200 Subject: [PATCH 3/6] Create and use merge_delta_keys_buffered which does in-memory key sorting --- pageserver/compaction/src/compact_tiered.rs | 9 +++++++-- pageserver/compaction/src/helpers.rs | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 137b93055a6d..d98c1466b3c5 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -24,7 +24,9 @@ use tracing::{debug, info}; use std::collections::{HashSet, VecDeque}; use std::ops::Range; -use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with}; +use crate::helpers::{ + accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with, +}; use crate::interface::*; use utils::lsn::Lsn; @@ -535,7 +537,10 @@ where } } // Open stream - let key_value_stream = std::pin::pin!(merge_delta_keys::(deltas.as_slice(), ctx)); + let key_value_stream = + std::pin::pin!(merge_delta_keys_buffered::(deltas.as_slice(), ctx) + .await? + .map(|k| Result::<_, anyhow::Error>::Ok(k))); let mut new_jobs = Vec::new(); // Slide a window through the keyspace diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index e8447e12fa31..38912a59e07c 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -110,6 +110,23 @@ pub fn merge_delta_keys<'a, E: CompactionJobExecutor>( } } +pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>( + layers: &'a [E::DeltaLayer], + ctx: &'a E::RequestContext, +) -> anyhow::Result>::DeltaEntry<'a>>> +{ + let mut keys = Vec::new(); + for l in layers { + // Boxing and casting to LoadFuture is required to obtain the right Sync bound. + // If we do l.load_keys(ctx).await? directly, there is a compilation error. + let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx)); + keys.extend(load_future.await?.into_iter()); + } + keys.sort_by_key(|k| (k.key(), k.lsn())); + let stream = futures::stream::iter(keys.into_iter()); + Ok(stream) +} + enum LazyLoadLayer<'a, E: CompactionJobExecutor> { Loaded(VecDeque<>::DeltaEntry<'a>>), Unloaded(&'a E::DeltaLayer), From 18f73beae391828cdb5cf9cc62e5466013e52cc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 9 May 2024 03:58:20 +0200 Subject: [PATCH 4/6] Revert "Don't mutate LazyLoadLayer inside the heap" This reverts commit ae755931fc9ced4c3abdb9d8ab66135ca04d27d5. --- pageserver/compaction/src/helpers.rs | 38 +++++++++++++--------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 38912a59e07c..15bb2b0ba882 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -11,7 +11,7 @@ use std::collections::BinaryHeap; use std::collections::VecDeque; use std::fmt::Display; use std::future::Future; -use std::ops::Range; +use std::ops::{DerefMut, Range}; use std::pin::Pin; use std::task::{ready, Poll}; use utils::lsn::Lsn; @@ -195,8 +195,8 @@ where match ready!(load_future.as_mut().poll(cx)) { Ok(entries) => { this.load_future.set(None); - this.heap - .push(LazyLoadLayer::Loaded(VecDeque::from(entries))); + *this.heap.peek_mut().unwrap() = + LazyLoadLayer::Loaded(VecDeque::from(entries)); } Err(e) => { return Poll::Ready(Some(Err(e))); @@ -208,25 +208,23 @@ where // loading it. Otherwise return the next entry from it and update // the layer's position in the heap (this decreaseKey operation is // performed implicitly when `top` is dropped). - // We have to remove the layer from the heap and then re-add it, - // because loading it (or just removing a key from it) can change - // its ordering relative to the other layers in the heap. - let Some(mut top) = this.heap.pop() else { - return Poll::Ready(None); - }; - match top { - LazyLoadLayer::Unloaded(ref mut l) => { - let fut = l.load_keys(this.ctx); - this.load_future.set(Some(Box::pin(fut))); - continue; - } - LazyLoadLayer::Loaded(ref mut entries) => { - let result = entries.pop_front().unwrap(); - if !entries.is_empty() { - this.heap.push(top); + if let Some(mut top) = this.heap.peek_mut() { + match top.deref_mut() { + LazyLoadLayer::Unloaded(ref mut l) => { + let fut = l.load_keys(this.ctx); + this.load_future.set(Some(Box::pin(fut))); + continue; + } + LazyLoadLayer::Loaded(ref mut entries) => { + let result = entries.pop_front().unwrap(); + if entries.is_empty() { + std::collections::binary_heap::PeekMut::pop(top); + } + return Poll::Ready(Some(Ok(result))); } - return Poll::Ready(Some(Ok(result))); } + } else { + return Poll::Ready(None); } } } From 5b2537c15e85320ea2c16b48f1d62463077f8f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 9 May 2024 04:00:55 +0200 Subject: [PATCH 5/6] Rename to min_key and min_lsn for clarity --- pageserver/compaction/src/helpers.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 15bb2b0ba882..06454ee1d050 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -132,13 +132,13 @@ enum LazyLoadLayer<'a, E: CompactionJobExecutor> { Unloaded(&'a E::DeltaLayer), } impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> { - fn key(&self) -> E::Key { + fn min_key(&self) -> E::Key { match self { Self::Loaded(entries) => entries.front().unwrap().key(), Self::Unloaded(dl) => dl.key_range().start, } } - fn lsn(&self) -> Lsn { + fn min_lsn(&self) -> Lsn { match self { Self::Loaded(entries) => entries.front().unwrap().lsn(), Self::Unloaded(dl) => dl.lsn_range().start, @@ -153,7 +153,7 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> { impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // reverse order so that we get a min-heap - (other.key(), other.lsn()).cmp(&(self.key(), self.lsn())) + (other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn())) } } impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> { From 496879b2f92662f3a44fe8bdc6059be2bbe6e1ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 9 May 2024 04:09:55 +0200 Subject: [PATCH 6/6] Clippy --- pageserver/compaction/src/compact_tiered.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index d98c1466b3c5..12882c9d59cf 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -540,7 +540,7 @@ where let key_value_stream = std::pin::pin!(merge_delta_keys_buffered::(deltas.as_slice(), ctx) .await? - .map(|k| Result::<_, anyhow::Error>::Ok(k))); + .map(Result::<_, anyhow::Error>::Ok)); let mut new_jobs = Vec::new(); // Slide a window through the keyspace