Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bottom-most-compaction: use in test_gc_feedback + fix bugs #8103

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,14 @@ async fn timeline_compact_handler(
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
flags |= CompactFlags::ForceImageLayerCreation;
}
if Some(true) == parse_query_param::<_, bool>(&request, "enhanced_gc_bottom_most_compaction")? {
if !cfg!(feature = "testing") {
return Err(ApiError::InternalServerError(anyhow!(
"enhanced_gc_bottom_most_compaction is only available in testing mode"
)));
}
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
}
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ impl DeltaLayerInner {
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ impl ImageLayerInner {
}

/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
#[cfg(test)]
pub(super) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down
2 changes: 0 additions & 2 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ impl Layer {
}

/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
#[cfg(test)]
pub(crate) async fn load_key_values(
&self,
ctx: &RequestContext,
Expand Down Expand Up @@ -1774,7 +1773,6 @@ impl DownloadedLayer {
}
}

#[cfg(test)]
async fn load_key_values(
&self,
owner: &Arc<LayerInner>,
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ pub enum GetLogicalSizePriority {
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
EnhancedGcBottomMostCompaction,
}

impl std::fmt::Debug for Timeline {
Expand Down Expand Up @@ -1096,7 +1097,6 @@ impl Timeline {
/// scan iterator interface. We could optimize this interface later to avoid some checks in the vectored
/// get path to maintain and split the probing and to-be-probe keyspace. We also need to ensure that
/// the scan operation will not cause OOM in the future.
#[allow(dead_code)]
pub(crate) async fn scan(
&self,
keyspace: KeySpace,
Expand Down
41 changes: 33 additions & 8 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ impl Timeline {
/// TODO: cancellation
pub(crate) async fn compact_legacy(
self: &Arc<Self>,
_cancel: &CancellationToken,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
return self.compact_with_gc(cancel, ctx).await;
}
problame marked this conversation as resolved.
Show resolved Hide resolved

// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
Expand Down Expand Up @@ -959,15 +963,20 @@ impl Timeline {
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
/// and create delta layers with all deltas >= gc horizon.
#[cfg(test)]
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
use crate::tenant::storage_layer::ValueReconstructState;
use std::collections::BTreeSet;

use crate::tenant::storage_layer::ValueReconstructState;
info!("running enhanced gc bottom-most compaction");
problame marked this conversation as resolved.
Show resolved Hide resolved

scopeguard::defer! {
info!("done enhanced gc bottom-most compaction");
};

// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
// The layer selection has the following properties:
// 1. If a layer is in the selection, all layers below it are in the selection.
Expand All @@ -976,6 +985,11 @@ impl Timeline {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let gc_info = self.gc_info.read().unwrap();
if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
return Err(CompactionError::Other(anyhow!(
"enhanced legacy compaction currently does not support retain_lsns (branches)"
)));
}
let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
let mut selected_layers = Vec::new();
// TODO: consider retain_lsns
Expand All @@ -987,6 +1001,11 @@ impl Timeline {
}
(selected_layers, gc_cutoff)
};
info!(
"picked {} layers for compaction with gc_cutoff={}",
layer_selection.len(),
gc_cutoff
);
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, collect the layer information to decide when to split the new delta layers.
let mut all_key_values = Vec::new();
Expand Down Expand Up @@ -1064,18 +1083,16 @@ impl Timeline {
} else if *lsn <= horizon {
match val {
crate::repository::Value::Image(image) => {
if lsn <= &horizon {
base_image = Some((*lsn, image.clone()));
break;
}
base_image = Some((*lsn, image.clone()));
break;
}
crate::repository::Value::WalRecord(wal) => {
delta_above_base_image.push((*lsn, wal.clone()));
}
}
}
}
delta_above_base_image.reverse();
problame marked this conversation as resolved.
Show resolved Hide resolved
// do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
keys_above_horizon.reverse();
let state = ValueReconstructState {
img: base_image,
Expand Down Expand Up @@ -1200,6 +1217,11 @@ impl Timeline {
);

let image_layer = image_layer_writer.finish(self, ctx).await?;
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
1
);
let mut compact_to = Vec::new();
compact_to.extend(delta_layers);
compact_to.push(image_layer);
Expand All @@ -1208,6 +1230,9 @@ impl Timeline {
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};

self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
problame marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ impl LayerManager {
}

/// Called when a GC-compaction is completed.
#[cfg(test)]
pub(crate) fn finish_gc_compaction(
&mut self,
compact_from: &[Layer],
Expand Down
3 changes: 3 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ def timeline_compact(
force_repartition=False,
force_image_layer_creation=False,
wait_until_uploaded=False,
enhanced_gc_bottom_most_compaction=False,
):
self.is_testing_enabled_or_skip()
query = {}
Expand All @@ -582,6 +583,8 @@ def timeline_compact(
query["force_image_layer_creation"] = "true"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
if enhanced_gc_bottom_most_compaction:
query["enhanced_gc_bottom_most_compaction"] = "true"

log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
Expand Down
48 changes: 47 additions & 1 deletion test_runner/performance/test_gc_feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "10 s",
"pitr_interval": "60 s",
skyzh marked this conversation as resolved.
Show resolved Hide resolved
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
Expand Down Expand Up @@ -99,6 +99,52 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
MetricReport.LOWER_IS_BETTER,
)

client.timeline_compact(tenant_id, timeline_id, enhanced_gc_bottom_most_compaction=True)
tline_detail = client.timeline_detail(tenant_id, timeline_id)
logical_size = tline_detail["current_logical_size"]
physical_size = tline_detail["current_physical_size"]

max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
for key_range in client.perf_info(tenant_id, timeline_id):
max_total_num_of_deltas = max(max_total_num_of_deltas, key_range["total_num_of_deltas"])
max_num_of_deltas_above_image = max(
max_num_of_deltas_above_image, key_range["num_of_deltas_above_image"]
)
zenbenchmark.record(
"logical_size_after_bottom_most_compaction",
logical_size // MB,
"Mb",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"physical_size_after_bottom_most_compaction",
physical_size // MB,
"Mb",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"physical/logical ratio after bottom_most_compaction",
physical_size / logical_size,
"",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"max_total_num_of_deltas_after_bottom_most_compaction",
max_total_num_of_deltas,
"",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
"max_num_of_deltas_above_image_after_bottom_most_compaction",
max_num_of_deltas_above_image,
"",
MetricReport.LOWER_IS_BETTER,
)

with endpoint.cursor() as cur:
cur.execute("SELECT * FROM t") # ensure data is not corrupted

layer_map_path = env.repo_dir / "layer-map.json"
log.info(f"Writing layer map to {layer_map_path}")
with layer_map_path.open("w") as f:
Expand Down
Loading