Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): use leases to temporarily block gc #8084

Merged
merged 8 commits into from
Jun 18, 2024
22 changes: 22 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ serde_with::serde_conv!(
|value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) }
);

impl LsnLease {
/// The default length for an explicit LSN lease request (10 minutes).
#[cfg(not(feature = "testing"))]
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60);

#[cfg(feature = "testing")]
pub const DEFAULT_LENGTH: Duration = Duration::from_secs(2);

/// The default length for an implicit LSN lease granted during
/// `get_lsn_by_timestamp` request (1 minutes).
#[cfg(not(feature = "testing"))]
pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60);

#[cfg(feature = "testing")]
pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(2);

/// Checks whether the lease is expired.
pub fn is_expired(&self, now: &SystemTime) -> bool {
now > &self.valid_until
}
}

/// The only [`TenantState`] variants we could be `TenantState::Activating` from.
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ActivatingFrom {
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LsnLease;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigResponse;
Expand Down Expand Up @@ -1730,7 +1731,7 @@ async fn lsn_lease_handler(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let result = timeline
.make_lsn_lease(lsn, &ctx)
.make_lsn_lease(lsn, LsnLease::DEFAULT_LENGTH, &ctx)
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;

json_response(StatusCode::OK, result)
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
Expand Down Expand Up @@ -935,7 +936,7 @@ impl PageServerHandler {
let timeline = self
.get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
.await?;
let lease = timeline.make_lsn_lease(lsn, ctx)?;
let lease = timeline.make_lsn_lease(lsn, LsnLease::DEFAULT_LENGTH, ctx)?;
let valid_until = lease
.valid_until
.duration_since(SystemTime::UNIX_EPOCH)
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub struct GcResult {
pub layers_needed_by_cutoff: u64,
pub layers_needed_by_pitr: u64,
pub layers_needed_by_branches: u64,
pub layers_needed_by_leases: u64,
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.

Expand Down Expand Up @@ -269,6 +270,7 @@ impl AddAssign for GcResult {
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
self.layers_needed_by_branches += other.layers_needed_by_branches;
self.layers_needed_by_leases += other.layers_needed_by_leases;
self.layers_not_updated += other.layers_not_updated;
self.layers_removed += other.layers_removed;

Expand Down
99 changes: 93 additions & 6 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::sync::watch;
Expand Down Expand Up @@ -65,9 +66,9 @@ use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::GcCutoffs;
use self::timeline::TimelineResources;
use self::timeline::WaitLsnError;
use self::timeline::{GcCutoffs, GcInfo};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
Expand Down Expand Up @@ -3010,12 +3011,13 @@ impl Tenant {
{
let mut target = timeline.gc_info.write().unwrap();

let now = SystemTime::now();
target.leases.retain(|_, lease| !lease.is_expired(&now));

match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
*target = GcInfo {
retain_lsns: branchpoints,
cutoffs,
};
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
}
None => {
// reasons for this being unavailable:
Expand Down Expand Up @@ -4050,7 +4052,7 @@ mod tests {
use itertools::Itertools;
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
use rand::{thread_rng, Rng};
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
Expand Down Expand Up @@ -6939,4 +6941,89 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();

let end_lsn = Lsn(0x100);
let image_layers = (0x20..=0x90)
.step_by(0x10)
.map(|n| {
(
Lsn(n),
vec![(key, test_img(&format!("data key at {:x}", n)))],
)
})
.collect();

let timeline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
Vec::new(),
image_layers,
end_lsn,
)
.await?;

let leased_lsns = [0x30, 0x50, 0x70];
let mut leases = Vec::new();
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
leases.push(timeline.make_lsn_lease(Lsn(*n), LsnLease::DEFAULT_LENGTH, &ctx)?);
Ok(())
});

// Renewing with shorter lease should not change the lease.
let updated_lease_0 =
timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?;
assert_eq!(updated_lease_0.valid_until, leases[0].valid_until);

// Renewing with a long lease should renew lease with later expiration time.
let updated_lease_1 =
timeline.make_lsn_lease(Lsn(leased_lsns[1]), LsnLease::DEFAULT_LENGTH * 2, &ctx)?;

assert!(updated_lease_1.valid_until > leases[1].valid_until);

// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
info!(
"latest_gc_cutoff_lsn: {}",
*timeline.get_latest_gc_cutoff_lsn()
);
timeline.force_set_disk_consistent_lsn(end_lsn);

let res = tenant
.gc_iteration(
Some(TIMELINE_ID),
0,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;

// Keeping everything <= Lsn(0x80) b/c leases:
// 0/10: initdb layer
// (0/20..=0/70).step_by(0x10): image layers added when creating the timeline.
assert_eq!(res.layers_needed_by_leases, 7);
// Keeping 0/90 b/c it is the latest layer.
assert_eq!(res.layers_not_updated, 1);
// Removed 0/80.
assert_eq!(res.layers_removed, 1);

// Make lease on a already GC-ed LSN.
// 0/80 does not have a valid lease + is below latest_gc_cutoff
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
let res = timeline.make_lsn_lease(Lsn(0x80), LsnLease::DEFAULT_LENGTH, &ctx);
assert!(res.is_err());

// Should still be able to renew a currently valid lease
// Assumption: original lease to is still valid for 0/50.
let _ = timeline.make_lsn_lease(Lsn(leased_lsns[1]), LsnLease::DEFAULT_LENGTH, &ctx)?;

Ok(())
}
}
21 changes: 21 additions & 0 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use pageserver_api::models::LsnLease;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -346,6 +347,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);

let mut first = true;
loop {
tokio::select! {
Expand All @@ -362,6 +364,11 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {

if first {
first = false;

if delay_by_default_lease_length(&cancel).await.is_err() {
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
break;
}

if random_init_delay(period, &cancel).await.is_err() {
break;
}
Expand Down Expand Up @@ -531,6 +538,20 @@ pub(crate) async fn random_init_delay(
}
}

/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_default_lease_length(
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(LsnLease::DEFAULT_LENGTH, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}

/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,
Expand Down
Loading
Loading