Skip to content

Commit

Permalink
storcon_cli: add 'drain' command (#8007)
Browse files Browse the repository at this point in the history
## Problem
We need the ability to prepare a subset of storage controller managed
pageservers for decommisioning. The storage controller cannot currently
express this in terms of scheduling constraints (it's a pretty special
case, so I'm not sure it even should).

## Summary of Changes
A new `drain` command is added to `storcon_cli`. It takes a set of nodes
to drain and migrates primary attachments outside of said set. Simple
round robing assignment is used under the assumption that nodes outside
of the draining set are evenly balanced.

Note that secondary locations are not migrated. This is fine for
staging, but the migration API will have to be extended for prod in
order to allow migration of secondaries as well.

I've tested this out against a neon local cluster. The immediate use for
this command will be to migrate staging to ARM(Arch64) pageservers.

Related neondatabase/cloud#14029
  • Loading branch information
VladLazar committed Jun 11, 2024
1 parent 126bcc3 commit 7121db3
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions control_plane/storcon_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
anyhow.workspace = true
clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
humantime.workspace = true
hyper.workspace = true
pageserver_api.workspace = true
Expand Down
208 changes: 208 additions & 0 deletions control_plane/storcon_cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::StreamExt;
use std::{collections::HashMap, str::FromStr, time::Duration};

use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -148,6 +149,22 @@ enum Command {
#[arg(long)]
threshold: humantime::Duration,
},
// Drain a set of specified pageservers by moving the primary attachments to pageservers
// outside of the specified set.
Drain {
// Set of pageserver node ids to drain.
#[arg(long)]
nodes: Vec<NodeId>,
// Optional: migration concurrency (default is 8)
#[arg(long)]
concurrency: Option<usize>,
// Optional: maximum number of shards to migrate
#[arg(long)]
max_shards: Option<usize>,
// Optional: when set to true, nothing is migrated, but the plan is printed to stdout
#[arg(long)]
dry_run: Option<bool>,
},
}

#[derive(Parser)]
Expand Down Expand Up @@ -737,6 +754,197 @@ async fn main() -> anyhow::Result<()> {
})
.await?;
}
Command::Drain {
nodes,
concurrency,
max_shards,
dry_run,
} => {
// Load the list of nodes, split them up into the drained and filled sets,
// and validate that draining is possible.
let node_descs = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"control/v1/node".to_string(),
None,
)
.await?;

let mut node_to_drain_descs = Vec::new();
let mut node_to_fill_descs = Vec::new();

for desc in node_descs {
let to_drain = nodes.iter().any(|id| *id == desc.id);
if to_drain {
node_to_drain_descs.push(desc);
} else {
node_to_fill_descs.push(desc);
}
}

if nodes.len() != node_to_drain_descs.len() {
anyhow::bail!("Drain requested for node which doesn't exist.")
}

let can_fill = node_to_fill_descs
.iter()
.filter(|desc| {
matches!(desc.availability, NodeAvailabilityWrapper::Active)
&& matches!(
desc.scheduling,
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
)
})
.any(|_| true);

if !can_fill {
anyhow::bail!("There are no nodes to drain to")
}

// Set the node scheduling policy to draining for the nodes which
// we plan to drain.
for node_desc in node_to_drain_descs.iter() {
let req = NodeConfigureRequest {
node_id: node_desc.id,
availability: None,
scheduling: Some(NodeSchedulingPolicy::Draining),
};

storcon_client
.dispatch::<_, ()>(
Method::PUT,
format!("control/v1/node/{}/config", node_desc.id),
Some(req),
)
.await?;
}

// Perform the drain: move each tenant shard scheduled on a node to
// be drained to a node which is being filled. A simple round robin
// strategy is used to pick the new node.
let tenants = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,
"control/v1/tenant".to_string(),
None,
)
.await?;

let mut selected_node_idx = 0;

struct DrainMove {
tenant_shard_id: TenantShardId,
from: NodeId,
to: NodeId,
}

let mut moves: Vec<DrainMove> = Vec::new();

let shards = tenants
.into_iter()
.flat_map(|tenant| tenant.shards.into_iter());
for shard in shards {
if let Some(max_shards) = max_shards {
if moves.len() >= max_shards {
println!(
"Stop planning shard moves since the requested maximum was reached"
);
break;
}
}

let should_migrate = {
if let Some(attached_to) = shard.node_attached {
node_to_drain_descs
.iter()
.map(|desc| desc.id)
.any(|id| id == attached_to)
} else {
false
}
};

if !should_migrate {
continue;
}

moves.push(DrainMove {
tenant_shard_id: shard.tenant_shard_id,
from: shard
.node_attached
.expect("We only migrate attached tenant shards"),
to: node_to_fill_descs[selected_node_idx].id,
});
selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
}

let total_moves = moves.len();

if dry_run == Some(true) {
println!("Dryrun requested. Planned {total_moves} moves:");
for mv in &moves {
println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
}

return Ok(());
}

const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
let mut stream = futures::stream::iter(moves)
.map(|mv| {
let client = Client::new(cli.api.clone(), cli.jwt.clone());
async move {
client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
Some(TenantShardMigrateRequest {
tenant_shard_id: mv.tenant_shard_id,
node_id: mv.to,
}),
)
.await
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
}
})
.buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));

let mut success = 0;
let mut failure = 0;

while let Some(res) = stream.next().await {
match res {
Ok(_) => {
success += 1;
}
Err((tenant_shard_id, from, to, error)) => {
failure += 1;
println!(
"Failed to migrate {} from node {} to node {}: {}",
tenant_shard_id, from, to, error
);
}
}

if (success + failure) % 20 == 0 {
println!(
"Processed {}/{} shards: {} succeeded, {} failed",
success + failure,
total_moves,
success,
failure
);
}
}

println!(
"Processed {}/{} shards: {} succeeded, {} failed",
success + failure,
total_moves,
success,
failure
);
}
}

Ok(())
Expand Down

1 comment on commit 7121db3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3286 tests run: 3131 passed, 0 failed, 155 skipped (full report)


Flaky tests (1)

Postgres 16

Code coverage* (full report)

  • functions: 31.6% (6631 of 20990 functions)
  • lines: 48.6% (51493 of 106045 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
7121db3 at 2024-06-11T17:59:33.945Z :recycle:

Please sign in to comment.