Skip to content

Commit

Permalink
Move CompactLevel0Phase1Result and metrics structs to compaction.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Mar 5, 2024
1 parent d8d39ae commit 636809f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 96 deletions.
93 changes: 0 additions & 93 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3612,12 +3612,6 @@ impl Timeline {
}
}

#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Layer>,
}

/// Top-level failure to compact.
#[derive(Debug, thiserror::Error)]
pub(crate) enum CompactionError {
Expand Down Expand Up @@ -3671,93 +3665,6 @@ impl DurationRecorder {
}
}

#[derive(Default)]
struct CompactLevel0Phase1StatsBuilder {
version: Option<u64>,
tenant_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_key_sort_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
new_deltas_count: Option<usize>,
new_deltas_size: Option<u64>,
}

#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
tenant_id: TenantShardId,
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_key_sort_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
level0_deltas_count: usize,
new_deltas_count: usize,
new_deltas_size: u64,
}

impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
type Error = anyhow::Error;

fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
Ok(Self {
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
tenant_id: value
.tenant_id
.ok_or_else(|| anyhow!("tenant_id not set"))?,
timeline_id: value
.timeline_id
.ok_or_else(|| anyhow!("timeline_id not set"))?,
read_lock_acquisition_micros: value
.read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
read_lock_held_spawn_blocking_startup_micros: value
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_key_sort_micros: value
.read_lock_held_key_sort_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
read_lock_held_compute_holes_micros: value
.read_lock_held_compute_holes_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
read_lock_drop_micros: value
.read_lock_drop_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
write_layer_files_micros: value
.write_layer_files_micros
.into_recorded()
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
level0_deltas_count: value
.level0_deltas_count
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
new_deltas_count: value
.new_deltas_count
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
new_deltas_size: value
.new_deltas_size
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
})
}
}

impl Timeline {
async fn finish_compact_batch(
self: &Arc<Self>,
Expand Down
105 changes: 102 additions & 3 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ use std::ops::{Deref, Range};
use std::sync::Arc;

use super::layer_manager::LayerManager;
use super::{CompactFlags, CompactLevel0Phase1Result, CompactLevel0Phase1StatsBuilder, DurationRecorder, RecordedDuration, Timeline};
use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};

use anyhow::Context;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::shard::TenantShardId;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;

use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, CompactLevel0Phase1Stats, Hole};
use crate::tenant::timeline::{drop_rlock, is_rel_fsm_block_key, is_rel_vm_block_key, Hole};
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
Expand Down Expand Up @@ -632,6 +634,103 @@ impl Timeline {
.collect::<Vec<_>>(),
})
}
}

#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Layer>,
}


#[derive(Default)]
struct CompactLevel0Phase1StatsBuilder {
version: Option<u64>,
tenant_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_key_sort_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
new_deltas_count: Option<usize>,
new_deltas_size: Option<u64>,
}

#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
tenant_id: TenantShardId,
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_key_sort_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
level0_deltas_count: usize,
new_deltas_count: usize,
new_deltas_size: u64,
}

impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
type Error = anyhow::Error;

fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
Ok(Self {
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
tenant_id: value
.tenant_id
.ok_or_else(|| anyhow!("tenant_id not set"))?,
timeline_id: value
.timeline_id
.ok_or_else(|| anyhow!("timeline_id not set"))?,
read_lock_acquisition_micros: value
.read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
read_lock_held_spawn_blocking_startup_micros: value
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_key_sort_micros: value
.read_lock_held_key_sort_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
read_lock_held_compute_holes_micros: value
.read_lock_held_compute_holes_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
read_lock_drop_micros: value
.read_lock_drop_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
write_layer_files_micros: value
.write_layer_files_micros
.into_recorded()
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
level0_deltas_count: value
.level0_deltas_count
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
new_deltas_count: value
.new_deltas_count
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
new_deltas_size: value
.new_deltas_size
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
})
}
}

impl Timeline {

/// Entry point for new tiered compaction algorithm.
///
Expand Down

0 comments on commit 636809f

Please sign in to comment.