Skip to content

Commit

Permalink
feat(pageserver): initial code sketch & test case for combined gc+com…
Browse files Browse the repository at this point in the history
…paction at gc_horizon (#7948)

A demo for a building block for compaction. The GC-compaction operation
iterates all layers below/intersect with the GC horizon, and do a full
layer rewrite of all of them. The end result will be image layer
covering the full keyspace at GC-horizon, and a bunch of delta layers
above the GC-horizon. This helps us collect the garbages of the
test_gc_feedback test case to reduce space amplification.

This operation can be manually triggered using an HTTP API or be
triggered based on some metrics. Actual method TBD.

The test is very basic and it's very likely that most part of the
algorithm will be rewritten. I would like to get this merged so that I
can have a basic skeleton for the algorithm and then make incremental
changes.

<img width="924" alt="image"
src="https://github.com/neondatabase/neon/assets/4198311/f3d49f4e-634f-4f56-986d-bfefc6ae6ee2">

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Jun 11, 2024
1 parent d3b892e commit 4c21007
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 0 deletions.
160 changes: 160 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4044,10 +4044,12 @@ mod tests {
use crate::DEFAULT_PG_VERSION;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use rand::{thread_rng, Rng};
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use utils::bin_ser::BeSer;
Expand Down Expand Up @@ -6697,4 +6699,162 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(images.len(), 0); // the image layer should not contain tombstones, or it is not created
}

#[tokio::test]
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
let (tenant, ctx) = harness.load().await;

fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}

// We create one bottom-most image layer, a delta layer D1 crossing the GC horizon, D2 below the horizon, and D3 above the horizon.
//
// | D1 | | D3 |
// -| |-- gc horizon -----------------
// | | | D2 |
// --------- img layer ------------------
//
// What we should expact from this compaction is:
// | Part of D1 | | D3 |
// --------- img layer with D1+D2 at GC horizon------------------

// img layer at 0x10
let img_layer = (0..10)
.map(|id| (get_key(id), test_img(&format!("value {id}@0x10"))))
.collect_vec();

let delta1 = vec![
// TODO: we should test a real delta record here, which requires us to add a variant of NeonWalRecord for testing purpose.
(
get_key(1),
Lsn(0x20),
Value::Image(test_img("value 1@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::Image(test_img("value 2@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::Image(test_img("value 3@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::Image(test_img("value 5@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::Image(test_img("value 6@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x40),
Value::Image(test_img("value 8@0x40")),
),
(
get_key(9),
Lsn(0x40),
Value::Image(test_img("value 9@0x40")),
),
];

let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![delta1, delta2, delta3], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
.await?;
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.pitr = Lsn(0x30);
guard.cutoffs.horizon = Lsn(0x30);
}

let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();

// Check if the image layer at the GC horizon contains exactly what we want
let image_at_gc_horizon = tline
.inspect_image_layers(Lsn(0x30), &ctx)
.await
.unwrap()
.into_iter()
.filter(|(k, _)| k.is_metadata_key())
.collect::<Vec<_>>();

assert_eq!(image_at_gc_horizon.len(), 10);
let expected_lsn = [0x10, 0x20, 0x30, 0x10, 0x10, 0x20, 0x20, 0x10, 0x10, 0x10];
for idx in 0..10 {
assert_eq!(
image_at_gc_horizon[idx],
(
get_key(idx as u32),
test_img(&format!("value {idx}@{:#x}", expected_lsn[idx]))
)
);
}

// Check if old layers are removed / new layers have the expected LSN
let mut all_layers = tline.inspect_historic_layers().await.unwrap();
all_layers.sort_by(|k1, k2| {
(
k1.is_delta,
k1.key_range.start,
k1.key_range.end,
k1.lsn_range.start,
k1.lsn_range.end,
)
.cmp(&(
k2.is_delta,
k2.key_range.start,
k2.key_range.end,
k2.lsn_range.start,
k2.lsn_range.end,
))
});
assert_eq!(
all_layers,
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..get_key(10),
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer that is cut in the middle
PersistentLayerKey {
key_range: Key::MIN..get_key(9),
lsn_range: Lsn(0x30)..Lsn(0x41),
is_delta: true
},
// The delta layer we created and should not be picked for the compaction
PersistentLayerKey {
key_range: get_key(8)..get_key(10),
lsn_range: Lsn(0x40)..Lsn(0x41),
is_delta: true
}
]
);

Ok(())
}
}
39 changes: 39 additions & 0 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,45 @@ impl DeltaLayerInner {
Ok(())
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let mut result = Vec::new();
let mut stream =
Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
while let Some(item) = stream.next().await {
let (key, lsn, pos) = item?;
// TODO: dedup code with get_reconstruct_value
// TODO: ctx handling and sharding
cursor
.read_blob_into_buf(pos.pos(), &mut buf, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", self.file.path)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
self.file.path
)
})?;
result.push((key, lsn, val));
}
Ok(result)
}

async fn plan_reads<Reader>(
keyspace: &KeySpace,
lsn_range: Range<Lsn>,
Expand Down
28 changes: 28 additions & 0 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,34 @@ impl ImageLayerInner {
Ok(())
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.get_stream_from(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
// TODO: dedup code with get_reconstruct_value
let (raw_key, offset) = item?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
// TODO: ctx handling and sharding
let blob = cursor
.read_blob(offset, ctx)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
result.push((key, self.lsn, Value::Image(value)));
}
Ok(result)
}

/// Traverse the layer's index to build read operations on the overlap of the input keyspace
/// and the keys in this layer.
///
Expand Down
31 changes: 31 additions & 0 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,23 @@ impl Layer {
})
}

/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
#[cfg(test)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
let layer = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| match err {
DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
layer.load_key_values(&self.0, ctx).await
}

/// Download the layer if evicted.
///
/// Will not error when the layer is already downloaded.
Expand Down Expand Up @@ -1757,6 +1774,20 @@ impl DownloadedLayer {
}
}

#[cfg(test)]
async fn load_key_values(
&self,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, crate::repository::Value)>> {
use LayerKind::*;

match self.get(owner, ctx).await? {
Delta(d) => d.load_key_values(ctx).await,
Image(i) => i.load_key_values(ctx).await,
}
}

async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
use LayerKind::*;
match self.get(owner, ctx).await? {
Expand Down
13 changes: 13 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5549,6 +5549,19 @@ impl Timeline {
all_data.sort();
Ok(all_data)
}

/// Get all historic layer descriptors in the layer map
#[cfg(test)]
pub(crate) async fn inspect_historic_layers(
self: &Arc<Timeline>,
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
layers.push(layer.key());
}
Ok(layers)
}
}

type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
Expand Down
Loading

1 comment on commit 4c21007

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3286 tests run: 3131 passed, 0 failed, 155 skipped (full report)


Code coverage* (full report)

  • functions: 31.6% (6624 of 20979 functions)
  • lines: 48.6% (51438 of 105803 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
4c21007 at 2024-06-11T15:36:40.615Z :recycle:

Please sign in to comment.