Skip to content

Commit

Permalink
pageserver: implement new style attachment location config API
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Sep 26, 2023
1 parent e8b585b commit 50bdc2e
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 2 deletions.
46 changes: 46 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
generation::Generation,
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
Expand Down Expand Up @@ -218,6 +219,8 @@ impl std::ops::Deref for TenantCreateRequest {
}
}

/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
Expand All @@ -243,6 +246,39 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
}

/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.
#[derive(Serialize, Deserialize, Debug)]
pub enum LocationConfigMode {
AttachedSingle,
AttachedMulti,
AttachedStale,
Secondary,
Detached,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfigSecondary {
pub warm: bool,
}

/// An alternative representation of `pageserver::tenant::LocationConf`,
/// for use in external-facing APIs.
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfig {
pub mode: LocationConfigMode,
/// If attaching, in what generation?
#[serde(default)]
pub generation: Option<Generation>,
#[serde(default)]
pub secondary_conf: Option<LocationConfigSecondary>,

// If requesting mode `Secondary`, configuration for that.
// Custom storage configuration for the tenant, if any
pub tenant_conf: TenantConfig,
}

#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
Expand All @@ -253,6 +289,16 @@ pub struct StatusResponse {
pub id: NodeId,
}

#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}

#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
Expand Down
49 changes: 47 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest,
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
Expand All @@ -27,7 +28,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
Expand Down Expand Up @@ -986,6 +987,47 @@ async fn update_tenant_config_handler(
json_response(StatusCode::OK, ())
}

async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
let conf = state.conf;

// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());
}

let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;

mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;

json_response(StatusCode::OK, ())
}

/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
async fn handle_tenant_break(
r: Request<Body>,
Expand Down Expand Up @@ -1406,6 +1448,9 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
Expand Down
11 changes: 11 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,17 @@ impl Tenant {
}
}

pub(crate) fn set_new_location_config(&self, new_conf: LocationConf) {
*self.tenant_conf.write().unwrap() = new_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}

/// Helper function to create a new Timeline struct.
///
/// The returned Timeline is in Loading state. The caller is responsible for
Expand Down
49 changes: 49 additions & 0 deletions pageserver/src/tenant/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,55 @@ impl LocationConf {
}
}

impl TryFrom<&'_ models::LocationConfig> for LocationConf {
type Error = anyhow::Error;

fn try_from(conf: &'_ models::LocationConfig) -> Result<Self, Self::Error> {
let tenant_conf = TenantConfOpt::try_from(&conf.tenant_conf)?;

fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
conf.generation
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
}

let mode = match &conf.mode {
models::LocationConfigMode::AttachedMulti => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Multi,
})
}
models::LocationConfigMode::AttachedSingle => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Single,
})
}
models::LocationConfigMode::AttachedStale => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Stale,
})
}
models::LocationConfigMode::Secondary => {
let warm = conf
.secondary_conf
.as_ref()
.map(|c| c.warm)
.unwrap_or(false);
LocationMode::Secondary(SecondaryLocationConfig { warm })
}
models::LocationConfigMode::Detached => {
// Should not have been called: API code should translate this mode
// into a detach rather than trying to decode it as a LocationConf
return Err(anyhow::anyhow!("Cannot decode a Detached configuration"));
}
};

Ok(Self { mode, tenant_conf })
}
}

impl Default for LocationConf {
// TODO: this should be removed once tenant loading can guarantee that we are never
// loading from a directory without a configuration.
Expand Down
129 changes: 129 additions & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,111 @@ pub async fn set_new_tenant_config(
Ok(())
}

#[instrument(skip_all, fields(tenant_id, new_location_config))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");

let mut existing_tenant = match get_tenant(tenant_id, false).await {
Ok(t) => Some(t),
Err(GetTenantError::NotFound(_)) => None,
Err(e) => anyhow::bail!(e),
};

// If we need to shut down a Tenant, do that first
let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) {
(LocationMode::Secondary(_), Some(t)) => Some(t),
(LocationMode::Attached(attach_conf), Some(t)) => {
if attach_conf.generation != t.generation {
Some(t)
} else {
None
}
}
_ => None,
};

// TODO: currently we risk concurrent operations interfering with the tenant
// while we await shutdown, but we also should not hold the TenantsMap lock
// across the whole operation. Before we start using this function in production,
// a follow-on change will revise how concurrency is handled in TenantsMap.

if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
existing_tenant = None;
}

if let Some(tenant) = existing_tenant {
// Update the existing tenant
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_location_config(new_location_config);
} else {
// Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock,
// and re-check that the state of anything we are replacing is as expected.
tenant_map_upsert_slot(tenant_id, |old_value| async move {
if let Some(TenantSlot::Attached(t)) = old_value {
if !matches!(t.current_state(), TenantState::Stopping { .. }) {
anyhow::bail!("Tenant state changed during location configuration update");
}
}

let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => TenantSlot::Secondary,
LocationMode::Attached(_attach_config) => {
// Do a schedule_local_tenant_processing
// FIXME: should avoid doing this disk I/O inside the TenantsMap lock,
// we have the same problem in load_tenant/attach_tenant. Probably
// need a lock in TenantSlot to fix this.
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let resources = TenantSharedResources {
broker_client,
remote_storage,
};
let new_tenant = schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_path,
new_location_config,
resources,
None,
&TENANTS,
ctx,
)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;

TenantSlot::Attached(new_tenant)
}
};

Ok(new_slot)
})
.await?;
}

Ok(())
}

#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
Expand Down Expand Up @@ -889,6 +994,30 @@ where
}
}

async fn tenant_map_upsert_slot<'a, F, R>(
tenant_id: TenantId,
upsert_fn: F,
) -> Result<(), TenantMapInsertError>
where
F: FnOnce(Option<TenantSlot>) -> R,
R: std::future::Future<Output = anyhow::Result<TenantSlot>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};

match upsert_fn(m.remove(&tenant_id)).await {
Ok(upsert_val) => {
m.insert(tenant_id, upsert_val);
Ok(())
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
}
}

/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
Expand Down

0 comments on commit 50bdc2e

Please sign in to comment.