Skip to content

Commit

Permalink
storcon: track number of attached shards for each node (#8011)
Browse files Browse the repository at this point in the history
## Problem
The storage controller does not track the number of shards attached to a
given pageserver. This is a requirement for various scheduling
operations (e.g. draining and filling will use this to figure out if the
cluster is balanced)

## Summary of Changes
Track the number of shards attached to each node.

Related #7387
  • Loading branch information
VladLazar committed Jun 11, 2024
1 parent 4c21007 commit 126bcc3
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 45 deletions.
101 changes: 75 additions & 26 deletions storage_controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum MaySchedule {
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
shard_count: usize,
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
attached_shard_count: usize,

/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
Expand All @@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode {
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
);

may_schedule_matches && self.shard_count == other.shard_count
may_schedule_matches
&& self.shard_count == other.shard_count
&& self.attached_shard_count == other.attached_shard_count
}
}

Expand Down Expand Up @@ -138,6 +142,15 @@ impl ScheduleContext {
}
}

pub(crate) enum RefCountUpdate {
PromoteSecondary,
Attach,
Detach,
DemoteAttached,
AddSecondary,
RemoveSecondary,
}

impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
Expand All @@ -146,6 +159,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
Expand All @@ -171,6 +185,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
Expand All @@ -179,7 +194,10 @@ impl Scheduler {
for shard in shards {
if let Some(node_id) = shard.intent.get_attached() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
Some(node) => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
Expand Down Expand Up @@ -227,31 +245,42 @@ impl Scheduler {
Ok(())
}

/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
/// Update the reference counts of a node. These reference counts are used to guide scheduling
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
/// targets this node and the number of tenants shars whose IntentState is attached to this
/// node.
///
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
/// [`Self::new`] or [`Self::node_upsert`])
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
tracing::error!("Scheduler missing node {node_id}");
debug_assert!(false);
return;
};

node.shard_count += 1;
}

/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
let Some(node) = self.nodes.get_mut(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};

node.shard_count -= 1;
match update {
RefCountUpdate::PromoteSecondary => {
node.attached_shard_count += 1;
}
RefCountUpdate::Attach => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
RefCountUpdate::Detach => {
node.shard_count -= 1;
node.attached_shard_count -= 1;
}
RefCountUpdate::DemoteAttached => {
node.attached_shard_count -= 1;
}
RefCountUpdate::AddSecondary => {
node.shard_count += 1;
}
RefCountUpdate::RemoveSecondary => {
node.shard_count -= 1;
}
}
}

pub(crate) fn node_upsert(&mut self, node: &Node) {
Expand All @@ -263,6 +292,7 @@ impl Scheduler {
Vacant(entry) => {
entry.insert(SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
});
}
Expand Down Expand Up @@ -385,6 +415,11 @@ impl Scheduler {
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().shard_count
}

#[cfg(test)]
pub(crate) fn get_node_attached_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().attached_shard_count
}
}

#[cfg(test)]
Expand Down Expand Up @@ -437,18 +472,28 @@ mod tests {
let scheduled = scheduler.schedule_shard(&[], &context)?;
t2_intent.set_attached(&mut scheduler, Some(scheduled));

assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
t1_intent.push_secondary(&mut scheduler, scheduled);

assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

t1_intent.clear(&mut scheduler);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);

let total_attached = scheduler.get_node_attached_shard_count(NodeId(1))
+ scheduler.get_node_attached_shard_count(NodeId(2));
assert_eq!(total_attached, 1);

if cfg!(debug_assertions) {
// Dropping an IntentState without clearing it causes a panic in debug mode,
Expand All @@ -459,8 +504,12 @@ mod tests {
assert!(result.is_err());
} else {
t2_intent.clear(&mut scheduler);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);

assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 0);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 0);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 0);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4312,7 +4312,7 @@ impl Service {
continue;
}

if tenant_shard.intent.demote_attached(node_id) {
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();

// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
Expand Down
56 changes: 38 additions & 18 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
Expand Down Expand Up @@ -153,7 +153,7 @@ impl IntentState {
}
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
if let Some(node_id) = node_id {
scheduler.node_inc_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
}
Self {
attached: node_id,
Expand All @@ -164,10 +164,10 @@ impl IntentState {
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
if let Some(new_attached) = &new_attached {
scheduler.node_inc_ref(*new_attached);
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
}
self.attached = new_attached;
}
Expand All @@ -177,50 +177,55 @@ impl IntentState {
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));

// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);

let demoted = self.attached;
self.attached = Some(promote_secondary);

scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
if let Some(demoted) = demoted {
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
}
}

pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
self.secondary.push(new_secondary);
}

/// It is legal to call this with a node that is not currently a secondary: that is a no-op
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
self.secondary.remove(index);
}
}

pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.node_dec_ref(secondary);
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
}
}

/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
}
}

pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}

self.clear_secondary(scheduler);
Expand Down Expand Up @@ -251,12 +256,11 @@ impl IntentState {
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
true
} else {
false
Expand Down Expand Up @@ -593,7 +597,7 @@ impl TenantShard {
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(*node_id);
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
Expand Down Expand Up @@ -783,7 +787,7 @@ impl TenantShard {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(old_attached_node_id);
self.intent.demote_attached(scheduler, old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
Expand Down Expand Up @@ -1321,7 +1325,9 @@ pub(crate) mod tests {
assert_ne!(attached_node_id, secondary_node_id);

// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_shard.intent.demote_attached(attached_node_id);
let changed = tenant_shard
.intent
.demote_attached(&mut scheduler, attached_node_id);
assert!(changed);
assert!(tenant_shard.intent.attached.is_none());
assert_eq!(tenant_shard.intent.secondary.len(), 2);
Expand Down Expand Up @@ -1604,7 +1610,14 @@ pub(crate) mod tests {

// We should see equal number of locations on the two nodes.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
// Scheduling does not consider the number of attachments picking the initial
// pageserver to attach to (hence the assertion that all primaries are on the
// same node)
// TODO: Tweak the scheduling to evenly distribute attachments for new shards.
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 4);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 0);

// Add another two nodes: we should see the shards spread out when their optimize
// methods are called
Expand All @@ -1613,9 +1626,16 @@ pub(crate) mod tests {
optimize_til_idle(&nodes, &mut scheduler, &mut shards);

assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);

assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);

for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
Expand Down

1 comment on commit 126bcc3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3286 tests run: 3131 passed, 0 failed, 155 skipped (full report)


Code coverage* (full report)

  • functions: 31.6% (6631 of 20979 functions)
  • lines: 48.7% (51507 of 105853 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
126bcc3 at 2024-06-11T16:30:37.383Z :recycle:

Please sign in to comment.