Skip to content

Commit

Permalink
storage controller: misc fixes (#7036)
Browse files Browse the repository at this point in the history
## Problem

Collection of small changes, batched together to reduce CI overhead.

## Summary of changes

- Layer download messages include size -- this is useful when watching a
pageserver hydrate its on disk cache in the log.
- Controller migrate API could put an invalid NodeId into TenantState
- Scheduling errors during tenant create could result in creating some
shards and not others.
- Consistency check could give hard-to-understand failures in tests if a
reconcile was in process: explicitly fail the check if reconciles are in
progress instead.
  • Loading branch information
jcsp committed Mar 6, 2024
1 parent 5dc2088 commit a9a4a76
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
64 changes: 46 additions & 18 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,9 +1159,12 @@ impl Service {

let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();

let mut response_shards = Vec::new();
let mut schcedule_error = None;

for tenant_shard_id in create_ids {
tracing::info!("Creating shard {tenant_shard_id}...");
Expand Down Expand Up @@ -1198,23 +1201,20 @@ impl Service {
continue;
}
Entry::Vacant(entry) => {
let mut state = TenantState::new(
let state = entry.insert(TenantState::new(
tenant_shard_id,
ShardIdentity::from_params(
tenant_shard_id.shard_number,
&create_req.shard_parameters,
),
placement_policy.clone(),
);
));

state.generation = initial_generation;
state.config = create_req.config.clone();

state.schedule(scheduler).map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
})?;
if let Err(e) = state.schedule(scheduler) {
schcedule_error = Some(e);
}

// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
Expand All @@ -1228,24 +1228,27 @@ impl Service {
generation: generation.into().unwrap(),
});
}
entry.insert(state)
}
};
}

// Take a snapshot of pageservers
let pageservers = locked.nodes.clone();

let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
// are present in the system. We do this here rather than in the above loop, to
// avoid situations where we only create a subset of shards in the tenant.
if let Some(e) = schcedule_error {
return Err(ApiError::Conflict(format!(
"Failed to schedule shard(s): {e}"
)));
}

let waiters = locked
.tenants
let waiters = tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| {
shard.maybe_reconcile(
result_tx.clone(),
&pageservers,
nodes,
&compute_hook,
&self.config,
&self.persistence,
Expand Down Expand Up @@ -2516,6 +2519,19 @@ impl Service {
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();

let Some(node) = nodes.get(&migrate_req.node_id) else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Node {} not found",
migrate_req.node_id
)));
};

if node.availability != NodeAvailability::Active {
// Warn but proceed: the caller may intend to manually adjust the placement of
// a shard even if the node is down, e.g. if intervening during an incident.
tracing::warn!("Migrating to an unavailable node ({})", node.id);
}

let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
Expand Down Expand Up @@ -2645,6 +2661,18 @@ impl Service {
.map(|t| t.to_persistent())
.collect::<Vec<_>>();

// This method can only validate the state of an idle system: if a reconcile is in
// progress, fail out early to avoid giving false errors on state that won't match
// between database and memory under a ReconcileResult is processed.
for t in locked.tenants.values() {
if t.reconciler.is_some() {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard {} reconciliation in progress",
t.tenant_shard_id
)));
}
}

(expect_nodes, expect_shards)
};

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ impl LayerInner {
}

self.consecutive_failures.store(0, Ordering::Relaxed);
tracing::info!("on-demand download successful");
tracing::info!(size=%self.desc.file_size, "on-demand download successful");

Ok(permit)
}
Expand Down

1 comment on commit a9a4a76

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2567 tests run: 2432 passed, 0 failed, 135 skipped (full report)


Code coverage* (full report)

  • functions: 28.8% (6992 of 24258 functions)
  • lines: 47.4% (43007 of 90711 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
a9a4a76 at 2024-03-06T17:33:34.787Z :recycle:

Please sign in to comment.