Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control_plane/attachment_service: implement PlacementPolicy::Secondary, configuration updates #6521

Merged
merged 15 commits into from
Mar 1, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE tenant_shards ALTER generation SET NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver SET NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@


ALTER TABLE tenant_shards ALTER generation DROP NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver DROP NOT NULL;
34 changes: 27 additions & 7 deletions control_plane/attachment_service/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,15 @@ impl Persistence {
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount::new(tsp.shard_count as u8),
};
result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));

let Some(g) = tsp.generation else {
// If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
// we only set generation_pageserver when setting generation.
return Err(DatabaseError::Logical(
"Generation should always be set after incrementing".to_string(),
));
};
result.insert(tenant_shard_id, Generation::new(g as u32));
}

Ok(result)
Expand Down Expand Up @@ -364,7 +372,16 @@ impl Persistence {
})
.await?;

Ok(Generation::new(updated.generation as u32))
let Some(g) = updated.generation else {
// If the generation column had been NULL, then we should have experienced an SQL Confilict error
// while executing a query that tries to increment it.
jcsp marked this conversation as resolved.
Show resolved Hide resolved
return Err(DatabaseError::Logical(
"Generation should always be set after incrementing".to_string(),
)
.into());
};

Ok(Generation::new(g as u32))
}

/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
Expand All @@ -389,7 +406,7 @@ impl Persistence {
// Update includes generation column
query
.set((
generation.eq(input_generation.into().unwrap() as i32),
generation.eq(Some(input_generation.into().unwrap() as i32)),
placement_policy
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
config.eq(serde_json::to_string(&input_config).unwrap()),
Expand Down Expand Up @@ -421,7 +438,7 @@ impl Persistence {
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set((
generation_pageserver.eq(i64::MAX),
generation_pageserver.eq(Option::<i64>::None),
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
))
.execute(conn)?;
Expand Down Expand Up @@ -547,12 +564,15 @@ pub(crate) struct TenantShardPersistence {
pub(crate) shard_stripe_size: i32,

// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching
pub(crate) generation: i32,
// and use the incremented number when attaching.
//
// Generation is only None when first onboarding a tenant, where it may
// be in PlacementPolicy::Secondary and therefore have no valid generation state.
pub(crate) generation: Option<i32>,

// Currently attached pageserver
#[serde(rename = "pageserver")]
pub(crate) generation_pageserver: i64,
pub(crate) generation_pageserver: Option<i64>,

#[serde(default)]
pub(crate) placement_policy: String,
Expand Down
33 changes: 21 additions & 12 deletions control_plane/attachment_service/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) struct Reconciler {
/// of a tenant's state from when we spawned a reconcile task.
pub(super) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
pub(crate) generation: Generation,
pub(crate) generation: Option<Generation>,
pub(crate) intent: TargetState,
pub(crate) config: TenantConfig,
pub(crate) observed: ObservedState,
Expand Down Expand Up @@ -312,7 +312,7 @@ impl Reconciler {
&self.shard,
&self.config,
LocationConfigMode::AttachedStale,
Some(self.generation),
self.generation,
None,
);
self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
Expand All @@ -335,16 +335,17 @@ impl Reconciler {
}

// Increment generation before attaching to new pageserver
self.generation = self
.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.await?;
self.generation = Some(
self.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.await?,
);

let dest_conf = build_location_config(
&self.shard,
&self.config,
LocationConfigMode::AttachedMulti,
Some(self.generation),
self.generation,
None,
);

Expand Down Expand Up @@ -401,7 +402,7 @@ impl Reconciler {
&self.shard,
&self.config,
LocationConfigMode::AttachedSingle,
Some(self.generation),
self.generation,
None,
);
self.location_config(dest_ps_id, dest_final_conf.clone(), None)
Expand Down Expand Up @@ -433,8 +434,15 @@ impl Reconciler {

// If the attached pageserver is not attached, do so now.
if let Some(node_id) = self.intent.attached {
let mut wanted_conf =
attached_location_conf(self.generation, &self.shard, &self.config);
let Some(generation) = self.generation else {
// This should never happen: we do not enter an attached PlacementPolicy without
// initializing the generation for a tenant.
jcsp marked this conversation as resolved.
Show resolved Hide resolved
return Err(ReconcileError::Other(anyhow::anyhow!(
"Attempted to attach with NULL generation"
)));
};

let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
Expand Down Expand Up @@ -474,11 +482,12 @@ impl Reconciler {
};

if increment_generation {
self.generation = self
let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node_id)
.await?;
wanted_conf.generation = self.generation.into();
self.generation = Some(generation);
wanted_conf.generation = generation.into();
}
tracing::info!(%node_id, "Observed configuration requires update.");
self.location_config(node_id, wanted_conf, None).await?;
Expand Down
4 changes: 2 additions & 2 deletions control_plane/attachment_service/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ diesel::table! {
shard_number -> Int4,
shard_count -> Int4,
shard_stripe_size -> Int4,
generation -> Int4,
generation_pageserver -> Int8,
generation -> Nullable<Int4>,
generation_pageserver -> Nullable<Int8>,
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,
Expand Down
Loading