diff --git a/control-plane/plugin/src/bin/rest-plugin/main.rs b/control-plane/plugin/src/bin/rest-plugin/main.rs index b71e5cfeb..f5cf80532 100644 --- a/control-plane/plugin/src/bin/rest-plugin/main.rs +++ b/control-plane/plugin/src/bin/rest-plugin/main.rs @@ -1,20 +1,12 @@ use clap::Parser; use openapi::tower::client::Url; -use plugin::{ - operations::{ - Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, Operations, - RebuildHistory, ReplicaTopology, Scale, - }, - resources::{ - blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, - GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, - }, - rest_wrapper::RestClient, -}; +use plugin::{operations::Operations, rest_wrapper::RestClient, ExecuteOperation}; +use snafu::ResultExt; use std::env; #[derive(clap::Parser, Debug)] #[clap(name = utils::package_description!(), version = utils::version_info_str!())] +#[group(skip)] struct CliArgs { /// The rest endpoint to connect to. #[clap(global = true, long, short, default_value = "http://localhost:8081")] @@ -22,120 +14,42 @@ struct CliArgs { /// The operation to be performed. #[clap(subcommand)] - operations: Operations, + operation: Operations, - /// The Output, viz yaml, json. - #[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)] - output: plugin::resources::utils::OutputFormat, - - /// Trace rest requests to the Jaeger endpoint agent. - #[clap(global = true, long, short)] - jaeger: Option, - - /// Timeout for the REST operations. - #[clap(long, short, default_value = "10s")] - timeout: humantime::Duration, -} -impl CliArgs { - fn args() -> Self { - CliArgs::parse() - } + #[clap(flatten)] + args: plugin::CliArgs, } #[tokio::main] async fn main() { - plugin::init_tracing(CliArgs::args().jaeger.as_ref()); + let cli_args = CliArgs::args(); + let _trace_flush = cli_args.args.init_tracing(); - execute(CliArgs::args()).await; + if let Err(error) = cli_args.execute().await { + eprintln!("{error}"); + std::process::exit(1); + } +} - utils::tracing_telemetry::flush_traces(); +#[derive(Debug, snafu::Snafu)] +enum Error { + #[snafu(display("Failed to initialise the REST client. Error {source}"))] + RestClient { source: anyhow::Error }, + #[snafu(display("{source}"))] + Resources { source: plugin::resources::Error }, } -async fn execute(cli_args: CliArgs) { - // Initialise the REST client. - if let Err(e) = RestClient::init(cli_args.rest.clone(), *cli_args.timeout) { - println!("Failed to initialise the REST client. Error {e}"); +impl CliArgs { + fn args() -> Self { + CliArgs::parse() + } + async fn execute(&self) -> Result<(), Error> { + // todo: client connection is lazy, we should do sanity connection test here. + // Example, we can use use rest liveness probe. + RestClient::init(self.rest.clone(), *self.args.timeout).context(RestClientSnafu)?; + self.operation + .execute(&self.args) + .await + .context(ResourcesSnafu) } - - // Perform the operations based on the subcommand, with proper output format. - let result = match &cli_args.operations { - Operations::Drain(resource) => match resource { - DrainResources::Node(drain_node_args) => { - node::Node::drain( - &drain_node_args.node_id(), - drain_node_args.label(), - drain_node_args.drain_timeout(), - &cli_args.output, - ) - .await - } - }, - Operations::Get(resource) => match resource { - GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { - GetCordonArgs::Node { id: node_id } => { - cordon::NodeCordon::get(node_id, &cli_args.output).await - } - GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output).await, - }, - GetResources::Drain(get_drain_resource) => match get_drain_resource { - GetDrainArgs::Node { id: node_id } => { - drain::NodeDrain::get(node_id, &cli_args.output).await - } - GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output).await, - }, - GetResources::Volumes(vol_args) => { - volume::Volumes::list(&cli_args.output, vol_args).await - } - GetResources::Volume { id } => volume::Volume::get(id, &cli_args.output).await, - GetResources::RebuildHistory { id } => { - volume::Volume::rebuild_history(id, &cli_args.output).await - } - GetResources::VolumeReplicaTopologies(vol_args) => { - volume::Volume::topologies(&cli_args.output, vol_args).await - } - GetResources::VolumeReplicaTopology { id } => { - volume::Volume::topology(id, &cli_args.output).await - } - GetResources::Pools => pool::Pools::list(&cli_args.output).await, - GetResources::Pool { id } => pool::Pool::get(id, &cli_args.output).await, - GetResources::Nodes => node::Nodes::list(&cli_args.output).await, - GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output).await, - GetResources::BlockDevices(bdargs) => { - blockdevice::BlockDevice::get_blockdevices( - &bdargs.node_id(), - &bdargs.all(), - &cli_args.output, - ) - .await - } - GetResources::VolumeSnapshots(snapargs) => { - snapshot::VolumeSnapshots::get_snapshots( - &snapargs.volume(), - &snapargs.snapshot(), - &cli_args.output, - ) - .await - } - }, - Operations::Scale(resource) => match resource { - ScaleResources::Volume { id, replica_count } => { - volume::Volume::scale(id, *replica_count, &cli_args.output).await - } - }, - Operations::Cordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::cordon(id, label, &cli_args.output).await - } - }, - Operations::Uncordon(resource) => match resource { - CordonResources::Node { id, label } => { - node::Node::uncordon(id, label, &cli_args.output).await - } - }, - }; - - if let Err(error) = result { - eprintln!("{error}"); - std::process::exit(1); - }; } diff --git a/control-plane/plugin/src/lib.rs b/control-plane/plugin/src/lib.rs index eb793b4da..efb1c1919 100644 --- a/control-plane/plugin/src/lib.rs +++ b/control-plane/plugin/src/lib.rs @@ -3,26 +3,186 @@ extern crate prettytable; #[macro_use] extern crate lazy_static; +use crate::{ + operations::{ + Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, Operations, + PluginResult, RebuildHistory, ReplicaTopology, Scale, + }, + resources::{ + blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources, + GetCordonArgs, GetDrainArgs, GetResources, ScaleResources, UnCordonResources, + }, +}; + pub mod operations; pub mod resources; pub mod rest_wrapper; -/// Initialize tracing (including opentelemetry). -pub fn init_tracing(jaeger: Option<&String>) { - let git_version = option_env!("GIT_VERSION").unwrap_or_else(utils::raw_version_str); - let tags = - utils::tracing_telemetry::default_tracing_tags(git_version, env!("CARGO_PKG_VERSION")); - - let fmt_layer = match std::env::var("RUST_LOG") { - Ok(_) => utils::tracing_telemetry::FmtLayer::Stderr, - Err(_) => utils::tracing_telemetry::FmtLayer::None, - }; - - utils::tracing_telemetry::init_tracing_ext( - env!("CARGO_PKG_NAME"), - tags, - jaeger, - fmt_layer, - None, - ); +/// Flush traces on `Drop`. +pub struct TracingFlusher {} +impl Drop for TracingFlusher { + fn drop(&mut self) { + utils::tracing_telemetry::flush_traces(); + } +} + +/// Every rest plugin operation must implement this trait to become composable. +#[async_trait::async_trait(?Send)] +pub trait ExecuteOperation { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult; +} + +#[derive(clap::Parser, Debug)] +pub struct CliArgs { + /// The Output, viz yaml, json. + #[clap(global = true, default_value = resources::utils::OutputFormat::None.as_ref(), short, long)] + pub output: resources::utils::OutputFormat, + + /// Trace rest requests to the Jaeger endpoint agent. + #[clap(global = true, long, short)] + pub jaeger: Option, + + /// Timeout for the REST operations. + #[clap(long, short, default_value = "10s")] + pub timeout: humantime::Duration, +} + +impl CliArgs { + /// Initialize tracing (including opentelemetry). + pub fn init_tracing(&self) -> TracingFlusher { + let git_version = option_env!("GIT_VERSION").unwrap_or_else(utils::raw_version_str); + let tags = + utils::tracing_telemetry::default_tracing_tags(git_version, env!("CARGO_PKG_VERSION")); + + let fmt_layer = match std::env::var("RUST_LOG") { + Ok(_) => utils::tracing_telemetry::FmtLayer::Stderr, + Err(_) => utils::tracing_telemetry::FmtLayer::None, + }; + + utils::tracing_telemetry::init_tracing_ext( + env!("CARGO_PKG_NAME"), + tags, + self.jaeger.as_ref(), + fmt_layer, + None, + ); + + TracingFlusher {} + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for Operations { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + Operations::Drain(resource) => resource.execute(cli_args).await, + Operations::Get(resource) => resource.execute(cli_args).await, + Operations::Scale(resource) => resource.execute(cli_args).await, + Operations::Cordon(resource) => resource.execute(cli_args).await, + Operations::Uncordon(resource) => resource.execute(cli_args).await, + } + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for DrainResources { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + DrainResources::Node(drain_node_args) => { + node::Node::drain( + &drain_node_args.node_id(), + drain_node_args.label(), + drain_node_args.drain_timeout(), + &cli_args.output, + ) + .await + } + } + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for GetResources { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + GetResources::Cordon(get_cordon_resource) => match get_cordon_resource { + GetCordonArgs::Node { id: node_id } => { + cordon::NodeCordon::get(node_id, &cli_args.output).await + } + GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output).await, + }, + GetResources::Drain(get_drain_resource) => match get_drain_resource { + GetDrainArgs::Node { id: node_id } => { + drain::NodeDrain::get(node_id, &cli_args.output).await + } + GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output).await, + }, + GetResources::Volumes(vol_args) => { + volume::Volumes::list(&cli_args.output, vol_args).await + } + GetResources::Volume { id } => volume::Volume::get(id, &cli_args.output).await, + GetResources::RebuildHistory { id } => { + volume::Volume::rebuild_history(id, &cli_args.output).await + } + GetResources::VolumeReplicaTopologies(vol_args) => { + volume::Volume::topologies(&cli_args.output, vol_args).await + } + GetResources::VolumeReplicaTopology { id } => { + volume::Volume::topology(id, &cli_args.output).await + } + GetResources::Pools => pool::Pools::list(&cli_args.output).await, + GetResources::Pool { id } => pool::Pool::get(id, &cli_args.output).await, + GetResources::Nodes => node::Nodes::list(&cli_args.output).await, + GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output).await, + GetResources::BlockDevices(bdargs) => { + blockdevice::BlockDevice::get_blockdevices( + &bdargs.node_id(), + &bdargs.all(), + &cli_args.output, + ) + .await + } + GetResources::VolumeSnapshots(snapargs) => { + snapshot::VolumeSnapshots::get_snapshots( + &snapargs.volume(), + &snapargs.snapshot(), + &cli_args.output, + ) + .await + } + } + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for ScaleResources { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + ScaleResources::Volume { id, replica_count } => { + volume::Volume::scale(id, *replica_count, &cli_args.output).await + } + } + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for CordonResources { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + CordonResources::Node { id, label } => { + node::Node::cordon(id, label, &cli_args.output).await + } + } + } +} + +#[async_trait::async_trait(?Send)] +impl ExecuteOperation for UnCordonResources { + async fn execute(&self, cli_args: &CliArgs) -> PluginResult { + match self { + UnCordonResources::Node { id, label } => { + node::Node::uncordon(id, label, &cli_args.output).await + } + } + } } diff --git a/control-plane/plugin/src/operations.rs b/control-plane/plugin/src/operations.rs index 7ef61b0cd..6efda4c9c 100644 --- a/control-plane/plugin/src/operations.rs +++ b/control-plane/plugin/src/operations.rs @@ -1,5 +1,6 @@ use crate::resources::{ error::Error, utils, CordonResources, DrainResources, GetResources, ScaleResources, + UnCordonResources, }; use async_trait::async_trait; @@ -23,7 +24,7 @@ pub enum Operations { Cordon(CordonResources), /// 'Uncordon' resources. #[clap(subcommand)] - Uncordon(CordonResources), + Uncordon(UnCordonResources), } /// Drain trait. diff --git a/control-plane/plugin/src/resources/error.rs b/control-plane/plugin/src/resources/error.rs index 6add2fc5b..fd60ee084 100644 --- a/control-plane/plugin/src/resources/error.rs +++ b/control-plane/plugin/src/resources/error.rs @@ -1,12 +1,12 @@ use snafu::Snafu; -/// All errors returned when plugin command fails +/// All errors returned when resources command fails. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] #[allow(clippy::enum_variant_names)] pub enum Error { /// Error when listing block devices fails. - #[snafu(display("Failed to list blockdevices for node {id} . Error {source}"))] + #[snafu(display("Failed to list blockdevices for node {id}. Error {source}"))] GetBlockDevicesError { id: String, source: openapi::tower::client::Error, diff --git a/control-plane/plugin/src/resources/mod.rs b/control-plane/plugin/src/resources/mod.rs index f811de54a..e91a74b26 100644 --- a/control-plane/plugin/src/resources/mod.rs +++ b/control-plane/plugin/src/resources/mod.rs @@ -15,6 +15,7 @@ pub mod snapshot; pub mod utils; pub mod volume; +pub use error::Error; pub type VolumeId = openapi::apis::Uuid; pub type SnapshotId = openapi::apis::Uuid; pub type ReplicaCount = u8; @@ -75,6 +76,14 @@ pub enum CordonResources { Node { id: NodeId, label: String }, } +/// The types of resources that support uncordoning. +#[derive(clap::Subcommand, Debug)] +pub enum UnCordonResources { + /// Removes the cordon label from the node. + /// When the node has no more cordon labels, it is effectively uncordoned. + Node { id: NodeId, label: String }, +} + /// The types of resources that support the 'get cordon' operation. #[derive(clap::Subcommand, Debug)] pub enum GetCordonArgs { @@ -101,6 +110,6 @@ pub enum GetDrainArgs { Nodes, } -/// Tabular Output Tests +/// Tabular Output Tests. #[cfg(test)] mod tests;