Skip to content

Commit

Permalink
Try #730:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Feb 8, 2024
2 parents 4e1e3ac + 66d2d41 commit 18ce35c
Show file tree
Hide file tree
Showing 14 changed files with 491 additions and 166 deletions.
12 changes: 6 additions & 6 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::CsiControllerConfig;
use std::collections::HashMap;
use stor_port::types::v0::openapi::{
clients,
clients::tower::StatusCode,
Expand All @@ -12,6 +11,7 @@ use stor_port::types::v0::openapi::{

use anyhow::{anyhow, Result};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use tracing::{debug, info, instrument};

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -93,18 +93,18 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
}
}

static REST_CLIENT: OnceCell<IoEngineApiClient> = OnceCell::new();
static REST_CLIENT: OnceCell<RestApiClient> = OnceCell::new();

/// Single instance API client for accessing REST API gateway.
/// Encapsulates communication with REST API by exposing a set of
/// high-level API functions, which perform (de)serialization
/// of API request/response objects.
#[derive(Debug)]
pub struct IoEngineApiClient {
pub struct RestApiClient {
rest_client: clients::tower::ApiClient,
}

impl IoEngineApiClient {
impl RestApiClient {
/// Initialize API client instance. Must be called prior to
/// obtaining the client instance.
pub(crate) fn initialize() -> Result<()> {
Expand Down Expand Up @@ -143,7 +143,7 @@ impl IoEngineApiClient {

/// Obtain client instance. Panics if called before the client
/// has been initialized.
pub(crate) fn get_client() -> &'static IoEngineApiClient {
pub(crate) fn get_client() -> &'static RestApiClient {
REST_CLIENT.get().expect("Rest client is not initialized")
}
}
Expand All @@ -154,7 +154,7 @@ pub(crate) enum ListToken {
Number(isize),
}

impl IoEngineApiClient {
impl RestApiClient {
/// List all nodes available in IoEngine cluster.
pub(crate) async fn list_nodes(&self) -> Result<Vec<Node>, ApiClientError> {
let response = self.rest_client.nodes_api().get_nodes(None).await?;
Expand Down
114 changes: 54 additions & 60 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, IoEngineApiClient,
client::ListToken, ApiClientError, CreateVolumeTopology, CsiControllerConfig, RestApiClient,
};

use csi_driver::context::{CreateParams, PublishParams};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand Down Expand Up @@ -289,7 +288,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
let mut volume_context = args.parameters.clone();

// First check if the volume already exists.
match IoEngineApiClient::get_client()
match RestApiClient::get_client()
.get_volume_for_create(&parsed_vol_uuid)
.await
{
Expand Down Expand Up @@ -318,7 +317,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let volume = match volume_content_source {
Some(snapshot_uuid) => {
IoEngineApiClient::get_client()
RestApiClient::get_client()
.create_snapshot_volume(
&parsed_vol_uuid,
&snapshot_uuid,
Expand All @@ -331,7 +330,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
.await?
}
None => {
IoEngineApiClient::get_client()
RestApiClient::get_client()
.create_volume(
&parsed_vol_uuid,
replica_count,
Expand Down Expand Up @@ -388,7 +387,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id))
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
IoEngineApiClient::get_client()
RestApiClient::get_client()
.delete_volume(&volume_uuid)
.await
.map_err(|e| {
Expand Down Expand Up @@ -437,9 +436,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
}

// Check if the volume is already published.
let volume = IoEngineApiClient::get_client()
.get_volume(&volume_id)
.await?;
let volume = RestApiClient::get_client().get_volume(&volume_id).await?;

let params = PublishParams::try_from(&args.volume_context)?;

Expand Down Expand Up @@ -479,52 +476,52 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Err(Status::internal(m));
}
},
_ => {
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
}
false
}
false
}

// if the csi-node happens to be a data-plane node, use that for nexus creation, otherwise
// let the control-plane select the target node.
let target_node = match IoEngineApiClient::get_client().get_node(&node_id).await {
Err(ApiClientError::ResourceNotExists(_)) => Ok(None),
Err(error) => Err(error),
// When nodes are not online for any reason (eg: io-engine no longer runs) on said node,
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Volume is not published.
let v = IoEngineApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
.await?;

if let Some((node, uri)) = get_volume_share_location(&v) {
debug!(
// if the csi-node happens to be a data-plane node, use that for nexus creation, otherwise
// let the control-plane select the target node.
let target_node = match RestApiClient::get_client().get_node(&node_id).await {
Err(ApiClientError::ResourceNotExists(_)) => Ok(None),
Err(error) => Err(error),
// When nodes are not online for any reason (eg: io-engine no longer runs) on said node,
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
.await?;

if let Some((node, uri)) = get_volume_share_location(&v) {
debug!(
"Volume {} successfully published on node {} via {}",
volume_id, node, uri
);
uri
} else {
let m = format!(
"Volume {volume_id} has been successfully published but URI is not available"
);
error!("{}", m);
return Err(Status::internal(m));
uri
} else {
let m = format!(
"Volume {volume_id} has been successfully published but URI is not available"
);
error!("{}", m);
return Err(Status::internal(m));
}
}
}
};
};

publish_context.insert("uri".to_string(), uri);

Expand All @@ -550,10 +547,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
// Check if target volume exists.
let volume = match IoEngineApiClient::get_client()
.get_volume(&volume_uuid)
.await
{
let volume = match RestApiClient::get_client().get_volume(&volume_uuid).await {
Ok(volume) => volume,
Err(ApiClientError::ResourceNotExists { .. }) => {
debug!("Volume {} does not exist, not unpublishing", args.volume_id);
Expand All @@ -572,7 +566,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
}

// Do forced volume upublish as Kubernetes already detached the volume.
IoEngineApiClient::get_client()
RestApiClient::get_client()
.unpublish_volume(&volume_uuid, true)
.await
.map_err(|e| {
Expand All @@ -599,7 +593,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id))
})?;
let _guard = csi_driver::limiter::VolumeOpGuard::new(volume_uuid)?;
let _volume = IoEngineApiClient::get_client()
let _volume = RestApiClient::get_client()
.get_volume(&volume_uuid)
.await
.map_err(|_e| Status::unimplemented("Not implemented"))?;
Expand Down Expand Up @@ -653,7 +647,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let vt_mapper = VolumeTopologyMapper::init().await?;

let volumes = IoEngineApiClient::get_client()
let volumes = RestApiClient::get_client()
.list_volumes(max_entries, ListToken::String(args.starting_token))
.await
.map_err(|e| Status::internal(format!("Failed to list volumes, error = {e:?}")))?;
Expand Down Expand Up @@ -705,7 +699,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let pools: Vec<Pool> = if let Some(node) = node {
debug!("Calculating pool capacity for node {}", node);
IoEngineApiClient::get_client()
RestApiClient::get_client()
.get_node_pools(node)
.await
.map_err(|e| {
Expand All @@ -715,7 +709,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
})?
} else {
debug!("Calculating overall pool capacity");
IoEngineApiClient::get_client()
RestApiClient::get_client()
.list_pools()
.await
.map_err(|e| {
Expand Down Expand Up @@ -802,7 +796,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed snapshot ID: {}", request.name))
})?;

let snapshot = IoEngineApiClient::get_client()
let snapshot = RestApiClient::get_client()
.create_volume_snapshot(&volume_uuid, &snap_uuid)
.await
.map_err(|error| match error {
Expand All @@ -828,7 +822,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Status::invalid_argument(format!("Malformed snapshot UUID: {}", args.snapshot_id))
})?;

IoEngineApiClient::get_client()
RestApiClient::get_client()
.delete_volume_snapshot(&snapshot_uuid)
.await
.map_err(|e| {
Expand Down Expand Up @@ -862,7 +856,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Err(Status::invalid_argument("max_entries can't be negative"));
}

let snapshots = IoEngineApiClient::get_client()
let snapshots = RestApiClient::get_client()
.list_volume_snapshots(vol_uuid, snap_uuid, max_entries, request.starting_token)
.await?;

Expand Down
5 changes: 3 additions & 2 deletions control-plane/csi-driver/src/bin/controller/identity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{ApiClientError, IoEngineApiClient};
use crate::{ApiClientError, RestApiClient};
use csi_driver::CSI_PLUGIN_NAME;
use rpc::csi::*;

use std::collections::HashMap;
use tonic::{Request, Response, Status};
use tracing::{debug, error, instrument};
Expand Down Expand Up @@ -65,7 +66,7 @@ impl rpc::csi::identity_server::Identity for CsiIdentitySvc {
// communicates to the Container Orchestrator that the plugin is not yet initialised but
// should not be restarted. See the CSI spec:
// https://github.com/container-storage-interface/spec/blob/5b0d4540158a260cb3347ef1c87ede8600afb9bf/csi.proto#L252-L256
let ready = match IoEngineApiClient::get_client().list_nodes().await {
let ready = match RestApiClient::get_client().list_nodes().await {
Ok(_) => true,
Err(ApiClientError::ServerCommunication { .. }) => {
error!("Failed to access REST API gateway, CSI Controller plugin is not ready",);
Expand Down
13 changes: 6 additions & 7 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use tracing::info;

use clap::{Arg, ArgMatches};
use client::{ApiClientError, CreateVolumeTopology, RestApiClient};
use config::CsiControllerConfig;
mod client;
mod config;
mod controller;
mod identity;
mod pvwatcher;
mod server;

use client::{ApiClientError, CreateVolumeTopology, IoEngineApiClient};
use config::CsiControllerConfig;
use clap::{Arg, ArgMatches};
use tracing::info;

const CSI_SOCKET: &str = "/var/tmp/csi.sock";
const CONCURRENCY_LIMIT: usize = 10;
Expand All @@ -18,7 +17,7 @@ const REST_TIMEOUT: &str = "30s";
/// Initialize all components before starting the CSI controller.
fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
CsiControllerConfig::initialize(args)?;
IoEngineApiClient::initialize()
RestApiClient::initialize()
.map_err(|error| anyhow::anyhow!("Failed to initialize API client, error = {error}"))?;
Ok(())
}
Expand All @@ -27,7 +26,7 @@ fn initialize_controller(args: &ArgMatches) -> anyhow::Result<()> {
async fn ping_rest_api() {
info!("Checking REST API endpoint accessibility ...");

match IoEngineApiClient::get_client().list_nodes().await {
match RestApiClient::get_client().list_nodes().await {
Err(error) => tracing::error!(?error, "REST API endpoint is not accessible"),
Ok(nodes) => {
let names: Vec<String> = nodes.into_iter().map(|n| n.id).collect();
Expand Down
4 changes: 2 additions & 2 deletions control-plane/csi-driver/src/bin/controller/pvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, info};
pub(crate) struct PvGarbageCollector {
pub(crate) pv_handle: Api<PersistentVolume>,
orphan_period: Option<humantime::Duration>,
rest_client: &'static crate::IoEngineApiClient,
rest_client: &'static crate::RestApiClient,
}

/// Methods implemented by PV Garbage Collector
Expand All @@ -24,7 +24,7 @@ impl PvGarbageCollector {
Ok(Self {
pv_handle: Api::<PersistentVolume>::all(client),
orphan_period,
rest_client: crate::IoEngineApiClient::get_client(),
rest_client: crate::RestApiClient::get_client(),
})
}
/// Starts watching PV events.
Expand Down
20 changes: 9 additions & 11 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use futures::TryFutureExt;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use tracing::{debug, error, info};
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
Expand All @@ -14,10 +10,12 @@ use std::{
sync::Arc,
task::{Context, Poll},
};

use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);
Expand Down
Loading

0 comments on commit 18ce35c

Please sign in to comment.