Skip to content

Commit

Permalink
chore(bors): merge pull request #724
Browse files Browse the repository at this point in the history
724: Delete Orphaned Volumes on a timer r=tiagolobocastro a=tiagolobocastro

    fix(csi-controller/gc): list volumes with pagination
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    chore: correct typo in the plaftform code
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    refactor(csi-controller/gc): reduce excessive unwrap and nesting
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    feat(csi-controller/orphaned-vols): gc orphaned volumes on a timer
    
    Due to a k8s bug, when pv deletion is attempted before pvc deletion, we may not
    receive the delete request, and thus leaking volumes, which cannot be deleted
    through any user facing api.
    
    Existing code to clean up orphaned volumes was being made use for the case when
    certain pv events might be missed.
    Therefore current WA is to delete the csi-controller pod.
    This change makes use of this existing logic but runs it on a given time period
    which is by default 5 minutes, which  allows us to automatically WA the bug.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jan 24, 2024
2 parents ce087f9 + e4a5f56 commit 3ea84a8
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn find_shutdown_volumes(
// else the drain operation is timed out

// todo: this should be handled better..
if platform::current_plaform_type() == platform::PlatformType::Deployer {
if platform::current_platform_type() == platform::PlatformType::Deployer {
for vi in node_spec.node_draining_volumes().await {
let mut volume = context.specs().volume(&vi).await?;
let request = DestroyShutdownTargets::new(vi, None);
Expand Down
4 changes: 2 additions & 2 deletions control-plane/agents/src/bin/ha/node/path_provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use nvmeadm::nvmf_subsystem::{NvmeSubsystems, Subsystem};
use stor_port::platform::{current_plaform_type, PlatformType};
use stor_port::platform::{current_platform_type, PlatformType};
use utils::NVME_TARGET_NQN_PREFIX;

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CachedNvmePathProvider {
}
#[cfg(target_os = "linux")]
fn udev_supported() -> bool {
match current_plaform_type() {
match current_platform_type() {
PlatformType::K8s => true,
PlatformType::None => true,
PlatformType::Deployer => false,
Expand Down
18 changes: 12 additions & 6 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ impl IoEngineApiClient {
}
}

/// Token used to list volumes with pagination.
pub(crate) enum ListToken {
String(String),
Number(isize),
}

impl IoEngineApiClient {
/// List all nodes available in IoEngine cluster.
pub(crate) async fn list_nodes(&self) -> Result<Vec<Node>, ApiClientError> {
Expand Down Expand Up @@ -178,17 +184,17 @@ impl IoEngineApiClient {
pub(crate) async fn list_volumes(
&self,
max_entries: i32,
starting_token: String,
starting_token: ListToken,
) -> Result<Volumes, ApiClientError> {
let max_entries = max_entries as isize;
let starting_token = if starting_token.is_empty() {
0
} else {
starting_token.parse::<isize>().map_err(|_| {
let starting_token = match starting_token {
ListToken::String(starting_token) if starting_token.is_empty() => 0,
ListToken::String(starting_token) => starting_token.parse::<isize>().map_err(|_| {
ApiClientError::InvalidArgument(
"Failed to parse starting token as an isize".to_string(),
)
})?
})?,
ListToken::Number(starting_token) => starting_token,
};

let response = self
Expand Down
6 changes: 4 additions & 2 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient};
use crate::{
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient,
};

use csi_driver::context::{CreateParams, PublishParams};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
Expand Down Expand Up @@ -652,7 +654,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
let vt_mapper = VolumeTopologyMapper::init().await?;

let volumes = IoEngineApiClient::get_client()
.list_volumes(max_entries, args.starting_token)
.list_volumes(max_entries, ListToken::String(args.starting_token))
.await
.map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?;

Expand Down
54 changes: 42 additions & 12 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,26 @@ const REST_TIMEOUT: &str = "30s";
fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
CsiControllerConfig::initialize(args)?;
IoEngineApiClient::initialize()
.map_err(|e| anyhow::anyhow!("Failed to initialize API client, error = {}", e))?;
.map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?;
Ok(())
}

#[tracing::instrument]
async fn ping_rest_api() {
info!("Checking REST API endpoint accessibility ...");

match IoEngineApiClient::get_client().list_nodes().await {
Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"),
Ok(nodes) => {
let names: Vec<String> = nodes.into_iter().map(|n| n.id).collect();
info!(
"REST API endpoint available, {len} IoEngine node(s) reported: {names:?}",
len = names.len(),
);
}
}
}

#[tokio::main(worker_threads = 2)]
async fn main() -> anyhow::Result<()> {
let args = clap::Command::new(utils::package_description!())
Expand All @@ -33,22 +49,22 @@ async fn main() -> anyhow::Result<()> {
.short('r')
.env("ENDPOINT")
.default_value("http://ksnode-1:30011")
.help("a URL endpoint to the control plane's rest endpoint"),
.help("A URL endpoint to the control plane's rest endpoint"),
)
.arg(
Arg::new("socket")
.long("csi-socket")
.short('c')
.env("CSI_SOCKET")
.default_value(CSI_SOCKET)
.help("CSI socket path"),
.help("The CSI socket path"),
)
.arg(
Arg::new("jaeger")
.short('j')
.long("jaeger")
.env("JAEGER_ENDPOINT")
.help("enable open telemetry and forward to jaeger"),
.help("Enable open telemetry and forward to jaeger"),
)
.arg(
Arg::new("timeout")
Expand Down Expand Up @@ -78,6 +94,15 @@ async fn main() -> anyhow::Result<()> {
"The number of worker threads that process requests"
),
)
.arg(
Arg::new("orphan-vol-gc-period")
.long("orphan-vol-gc-period")
.default_value("10m")
.help(
"How often to check and delete orphaned volumes. \n\
An orphan volume is a volume with no corresponding PV",
)
)
.get_matches();

utils::print_package_info!();
Expand All @@ -91,6 +116,13 @@ async fn main() -> anyhow::Result<()> {
tags,
args.get_one::<String>("jaeger").cloned(),
);
let orphan_period = args
.get_one::<String>("orphan-vol-gc-period")
.map(|p| p.parse::<humantime::Duration>())
.transpose()?;
let csi_socket = args
.get_one::<String>("socket")
.expect("CSI socket must be specified");

initialize_controller(&args)?;

Expand All @@ -99,18 +131,16 @@ async fn main() -> anyhow::Result<()> {
CsiControllerConfig::get_config().rest_endpoint()
);

// Try to detect REST API endpoint to debug the accessibility status.
ping_rest_api().await;

// Starts PV Garbage Collector if platform type is k8s
if stor_port::platform::current_plaform_type() == stor_port::platform::PlatformType::K8s {
let gc_instance = pvwatcher::PvGarbageCollector::new().await?;
if stor_port::platform::current_platform_type() == stor_port::platform::PlatformType::K8s {
let gc_instance = pvwatcher::PvGarbageCollector::new(orphan_period).await?;
tokio::spawn(async move { gc_instance.run_watcher().await });
}

let result = server::CsiServer::run(
args.get_one::<String>("socket")
.expect("CSI socket must be specified")
.clone(),
)
.await;
let result = server::CsiServer::run(csi_socket).await;
utils::tracing_telemetry::flush_traces();
result
}
Loading

0 comments on commit 3ea84a8

Please sign in to comment.