Skip to content

Commit

Permalink
chore(bors): merge pull request #768
Browse files Browse the repository at this point in the history
768: fix(csi-controller/pv-watcher): race between creation and gc r=tiagolobocastro a=tiagolobocastro

Check for deletion candidates using PVC and when such candidate is found
also ensure there is no corresponding PV.
This is required to avoid a race during creation where the volume is created
but the PV is not yet.
Also, do this check twice, as otherwise we might find false positives when
a PVC is deleted before the PV is sucessfully created, leading to confusing
message about GC.

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Mar 5, 2024
2 parents 8ed2dbb + c307925 commit 03dc8cc
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 27 deletions.
15 changes: 9 additions & 6 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,22 +293,25 @@ impl RestApiClient {
/// This operation is idempotent, so the caller does not see errors indicating
/// absence of the resource.
#[instrument(fields(volume.uuid = %volume_id), skip(self, volume_id))]
pub(crate) async fn delete_volume(&self, volume_id: &uuid::Uuid) -> Result<(), ApiClientError> {
Self::delete_idempotent(
pub(crate) async fn delete_volume(
&self,
volume_id: &uuid::Uuid,
) -> Result<bool, ApiClientError> {
let result = Self::delete_idempotent(
self.rest_client.volumes_api().del_volume(volume_id).await,
true,
)?;
debug!(volume.uuid=%volume_id, "Volume successfully deleted");
Ok(())
Ok(result)
}

/// Check HTTP status code, handle DELETE idempotency transparently.
pub(crate) fn delete_idempotent<T>(
result: Result<clients::tower::ResponseContent<T>, clients::tower::Error<RestJsonError>>,
idempotent: bool,
) -> Result<(), ApiClientError> {
) -> Result<bool, ApiClientError> {
match result {
Ok(_) => Ok(()),
Ok(_) => Ok(true),
Err(clients::tower::Error::Request(error)) => {
Err(clients::tower::Error::Request(error).into())
}
Expand All @@ -318,7 +321,7 @@ impl RestApiClient {
| StatusCode::NO_CONTENT
| StatusCode::PRECONDITION_FAILED => {
if idempotent {
Ok(())
Ok(false)
} else {
Err(clients::tower::Error::Response(response).into())
}
Expand Down
88 changes: 67 additions & 21 deletions control-plane/csi-driver/src/bin/controller/pvwatcher.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use crate::client::ListToken;
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::PersistentVolume;
use k8s_openapi::api::core::v1::{PersistentVolume, PersistentVolumeClaim};
use kube::{
api::Api,
api::{Api, ListParams},
runtime::{watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::{debug, error, info};

/// Struct for PV Garbage collector
/// Struct for PV Garbage collector.
#[derive(Clone)]
pub(crate) struct PvGarbageCollector {
pub(crate) pv_handle: Api<PersistentVolume>,
pub(super) struct PvGarbageCollector {
pv_handle: Api<PersistentVolume>,
pvc_handle: Api<PersistentVolumeClaim>,
orphan_period: Option<humantime::Duration>,
rest_client: &'static crate::RestApiClient,
}
Expand All @@ -22,7 +23,8 @@ impl PvGarbageCollector {
pub(crate) async fn new(orphan_period: Option<humantime::Duration>) -> anyhow::Result<Self> {
let client = Client::try_default().await?;
Ok(Self {
pv_handle: Api::<PersistentVolume>::all(client),
pv_handle: Api::<PersistentVolume>::all(client.clone()),
pvc_handle: Api::<PersistentVolumeClaim>::all(client),
orphan_period,
rest_client: crate::RestApiClient::get_client(),
})
Expand Down Expand Up @@ -101,8 +103,39 @@ impl PvGarbageCollector {
/// 2. to tackle k8s bug where volumes are leaked when PV deletion is attempted before
/// PVC deletion.
async fn delete_orphan_volumes(&self) {
let max_entries = 200;
let volumes = self.collect_volume_ids().await;
if volumes.is_empty() {
return;
}
let Some(pvcs) = self.collect_pvc_ids().await else {
return;
};

let mut gc_uids = Vec::with_capacity(volumes.len());
for volume_uid in volumes {
if self.is_vol_orphan(volume_uid, &pvcs).await {
gc_uids.push(volume_uid);
}
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
for volume_uid in gc_uids {
if self.is_vol_orphan(volume_uid, &pvcs).await {
self.delete_volume(volume_uid).await;
}
}
}

/// Check if there are no PVC's or PV's for the given volume.
async fn is_vol_orphan(&self, volume: uuid::Uuid, pvcs: &[String]) -> bool {
let pvc_orphan = !pvcs.contains(&volume.to_string());
pvc_orphan && self.is_vol_pv_orphaned(&volume).await
}

async fn collect_volume_ids(&self) -> Vec<uuid::Uuid> {
let max_entries = 500i32;
let mut starting_token = Some(0);
let mut volume_ids = Vec::with_capacity(max_entries as usize);

while let Some(token) = starting_token {
match self
.rest_client
Expand All @@ -111,28 +144,40 @@ impl PvGarbageCollector {
{
Ok(volumes) => {
starting_token = volumes.next_token;
for vol in volumes.entries {
if self.is_vol_orphaned(&vol.spec.uuid).await {
self.delete_volume(vol.spec.uuid).await;
}
}
volume_ids.extend(volumes.entries.into_iter().map(|v| v.spec.uuid));
}
Err(error) => {
error!(?error, "Unable to list volumes");
return;
break;
}
}
}
volume_ids
}
async fn collect_pvc_ids(&self) -> Option<Vec<String>> {
let max_entries = 500i32;
let mut pvc_ids = Vec::with_capacity(max_entries as usize);
let mut params = ListParams::default().limit(max_entries as u32);

async fn is_vol_orphaned(&self, volume_uuid: &uuid::Uuid) -> bool {
let pv_name = format!("pvc-{volume_uuid}");
if let Ok(None) = self.pv_handle.get_opt(&pv_name).await {
debug!(pv.name = pv_name, "PV is a deletion candidate");
true
} else {
false
loop {
let list = self.pvc_handle.list_metadata(&params).await.ok()?;
let pvcs = list.items.into_iter().filter_map(|p| p.uid());
pvc_ids.extend(pvcs);
match list.metadata.continue_ {
Some(token) => {
params = params.continue_token(&token);
}
None => {
break;
}
}
}
Some(pvc_ids)
}

async fn is_vol_pv_orphaned(&self, volume_uuid: &uuid::Uuid) -> bool {
let pv_name = format!("pvc-{volume_uuid}");
matches!(self.pv_handle.get_opt(&pv_name).await, Ok(None))
}

/// Accepts volume id and calls Control plane api to delete the Volume.
Expand All @@ -144,7 +189,8 @@ impl PvGarbageCollector {
return;
};
match self.rest_client.delete_volume(&volume_uuid).await {
Ok(_) => info!("Successfully deleted the volume"),
Ok(true) => info!("Successfully deleted the volume"),
Ok(false) => debug!("The volume had already been deleted"),
Err(error) => error!(?error, "Failed to delete the volume"),
}
}
Expand Down

0 comments on commit 03dc8cc

Please sign in to comment.