Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Orphaned Volumes on a timer #724

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn find_shutdown_volumes(
// else the drain operation is timed out

// todo: this should be handled better..
if platform::current_plaform_type() == platform::PlatformType::Deployer {
if platform::current_platform_type() == platform::PlatformType::Deployer {
for vi in node_spec.node_draining_volumes().await {
let mut volume = context.specs().volume(&vi).await?;
let request = DestroyShutdownTargets::new(vi, None);
Expand Down
4 changes: 2 additions & 2 deletions control-plane/agents/src/bin/ha/node/path_provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use nvmeadm::nvmf_subsystem::{NvmeSubsystems, Subsystem};
use stor_port::platform::{current_plaform_type, PlatformType};
use stor_port::platform::{current_platform_type, PlatformType};
use utils::NVME_TARGET_NQN_PREFIX;

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CachedNvmePathProvider {
}
#[cfg(target_os = "linux")]
fn udev_supported() -> bool {
match current_plaform_type() {
match current_platform_type() {
PlatformType::K8s => true,
PlatformType::None => true,
PlatformType::Deployer => false,
Expand Down
18 changes: 12 additions & 6 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ impl IoEngineApiClient {
}
}

/// Token used to list volumes with pagination.
pub(crate) enum ListToken {
String(String),
Number(isize),
}

impl IoEngineApiClient {
/// List all nodes available in IoEngine cluster.
pub(crate) async fn list_nodes(&self) -> Result<Vec<Node>, ApiClientError> {
Expand Down Expand Up @@ -178,17 +184,17 @@ impl IoEngineApiClient {
pub(crate) async fn list_volumes(
&self,
max_entries: i32,
starting_token: String,
starting_token: ListToken,
) -> Result<Volumes, ApiClientError> {
let max_entries = max_entries as isize;
let starting_token = if starting_token.is_empty() {
0
} else {
starting_token.parse::<isize>().map_err(|_| {
let starting_token = match starting_token {
ListToken::String(starting_token) if starting_token.is_empty() => 0,
ListToken::String(starting_token) => starting_token.parse::<isize>().map_err(|_| {
ApiClientError::InvalidArgument(
"Failed to parse starting token as an isize".to_string(),
)
})?
})?,
ListToken::Number(starting_token) => starting_token,
};

let response = self
Expand Down
6 changes: 4 additions & 2 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient};
use crate::{
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient,
};

use csi_driver::context::{CreateParams, PublishParams};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
Expand Down Expand Up @@ -652,7 +654,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
let vt_mapper = VolumeTopologyMapper::init().await?;

let volumes = IoEngineApiClient::get_client()
.list_volumes(max_entries, args.starting_token)
.list_volumes(max_entries, ListToken::String(args.starting_token))
.await
.map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?;

Expand Down
54 changes: 42 additions & 12 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,26 @@ const REST_TIMEOUT: &str = "30s";
fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
CsiControllerConfig::initialize(args)?;
IoEngineApiClient::initialize()
.map_err(|e| anyhow::anyhow!("Failed to initialize API client, error = {}", e))?;
.map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?;
Ok(())
}

#[tracing::instrument]
async fn ping_rest_api() {
info!("Checking REST API endpoint accessibility ...");

match IoEngineApiClient::get_client().list_nodes().await {
Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"),
Ok(nodes) => {
let names: Vec<String> = nodes.into_iter().map(|n| n.id).collect();
info!(
"REST API endpoint available, {len} IoEngine node(s) reported: {names:?}",
len = names.len(),
);
}
}
}

#[tokio::main(worker_threads = 2)]
async fn main() -> anyhow::Result<()> {
let args = clap::Command::new(utils::package_description!())
Expand All @@ -33,22 +49,22 @@ async fn main() -> anyhow::Result<()> {
.short('r')
.env("ENDPOINT")
.default_value("http://ksnode-1:30011")
.help("a URL endpoint to the control plane's rest endpoint"),
.help("A URL endpoint to the control plane's rest endpoint"),
)
.arg(
Arg::new("socket")
.long("csi-socket")
.short('c')
.env("CSI_SOCKET")
.default_value(CSI_SOCKET)
.help("CSI socket path"),
.help("The CSI socket path"),
)
.arg(
Arg::new("jaeger")
.short('j')
.long("jaeger")
.env("JAEGER_ENDPOINT")
.help("enable open telemetry and forward to jaeger"),
.help("Enable open telemetry and forward to jaeger"),
)
.arg(
Arg::new("timeout")
Expand Down Expand Up @@ -78,6 +94,15 @@ async fn main() -> anyhow::Result<()> {
"The number of worker threads that process requests"
),
)
.arg(
Arg::new("orphan-vol-gc-period")
.long("orphan-vol-gc-period")
.default_value("10m")
.help(
"How often to check and delete orphaned volumes. \n\
An orphan volume is a volume with no corresponding PV",
)
)
.get_matches();

utils::print_package_info!();
Expand All @@ -91,6 +116,13 @@ async fn main() -> anyhow::Result<()> {
tags,
args.get_one::<String>("jaeger").cloned(),
);
let orphan_period = args
.get_one::<String>("orphan-vol-gc-period")
.map(|p| p.parse::<humantime::Duration>())
.transpose()?;
let csi_socket = args
.get_one::<String>("socket")
.expect("CSI socket must be specified");

initialize_controller(&args)?;

Expand All @@ -99,18 +131,16 @@ async fn main() -> anyhow::Result<()> {
CsiControllerConfig::get_config().rest_endpoint()
);

// Try to detect REST API endpoint to debug the accessibility status.
ping_rest_api().await;

// Starts PV Garbage Collector if platform type is k8s
if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s {
let gc_instance = pvwatcher::PvGarbageCollector::new().await?;
if stor_port::platform::current_platform_type() == stor_port::platform::PlatformType::K8s {
let gc_instance = pvwatcher::PvGarbageCollector::new(orphan_period).await?;
tokio::spawn(async move { gc_instance.run_watcher().await });
}

let result = server::CsiServer::run(
args.get_one::<String>("socket")
.expect("CSI socket must be specified")
.clone(),
)
.await;
let result = server::CsiServer::run(csi_socket).await;
utils::tracing_telemetry::flush_traces();
result
}
Loading
Loading