Skip to content

Commit

Permalink
chore(bors): merge pull request #746
Browse files Browse the repository at this point in the history
746: feat(csi-driver): add volume expansion r=niladrih a=niladrih

Changes:
- Add offline and online expansion capabilities
- Add implementation for ControllerExpandVolume on the controller binary
- Add implmentation for the NodeExpandVolume on the node binary

Co-authored-by: Niladri Halder <niladri.halder26@gmail.com>
  • Loading branch information
mayastor-bors and niladrih committed Feb 28, 2024
2 parents ea9271e + a8bf6bd commit 3800c10
Show file tree
Hide file tree
Showing 23 changed files with 468 additions and 84 deletions.
2 changes: 1 addition & 1 deletion control-plane/agents/src/bin/core/volume/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pub(crate) async fn resizeable_replicas(
requested_size: u64,
) -> Result<Vec<Replica>, SvcError> {
if spec.size >= requested_size {
return Err(SvcError::VolumeResizeArgsInvalid {
return Err(SvcError::VolumeResizeSize {
vol_id: spec.uuid_str(),
requested_size,
current_size: spec.size,
Expand Down
8 changes: 4 additions & 4 deletions control-plane/agents/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ pub enum SvcError {
protocol: String,
},
#[snafu(display(
"Volume '{}' - resize args invalid. Current size: '{}', requested size: '{}'",
"Volume '{}' - required size is invalid. Current size: '{}', requested size: '{}'",
vol_id,
current_size,
requested_size
))]
VolumeResizeArgsInvalid {
VolumeResizeSize {
vol_id: String,
requested_size: u64,
current_size: u64,
Expand Down Expand Up @@ -912,8 +912,8 @@ impl From<SvcError> for ReplyError {
source,
extra,
},
SvcError::VolumeResizeArgsInvalid { .. } => ReplyError {
kind: ReplyErrorKind::InvalidArgument,
SvcError::VolumeResizeSize { .. } => ReplyError {
kind: ReplyErrorKind::NotAcceptable,
resource: ResourceKind::Volume,
source,
extra,
Expand Down
43 changes: 41 additions & 2 deletions control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use stor_port::types::v0::openapi::{
models,
models::{
AffinityGroup, AppNode, CreateVolumeBody, Node, NodeTopology, Pool, PoolTopology,
PublishVolumeBody, RestJsonError, Topology, Volume, VolumePolicy, VolumeShareProtocol,
Volumes,
PublishVolumeBody, ResizeVolumeBody, RestJsonError, Topology, Volume, VolumePolicy,
VolumeShareProtocol, Volumes,
},
};

Expand Down Expand Up @@ -41,6 +41,8 @@ pub enum ApiClientError {
Unavailable(String),
/// Precondition Failed.
PreconditionFailed(String),
/// Not Acceptable (406)
NotAcceptable(String),
}

/// Placeholder for volume topology for volume creation operation.
Expand Down Expand Up @@ -80,6 +82,7 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
StatusCode::SERVICE_UNAVAILABLE => Self::Unavailable(detailed),
StatusCode::PRECONDITION_FAILED => Self::PreconditionFailed(detailed),
StatusCode::BAD_REQUEST => Self::InvalidArgument(detailed),
StatusCode::NOT_ACCEPTABLE => Self::NotAcceptable(detailed),
status => Self::GenericOperation(status, detailed),
}
}
Expand Down Expand Up @@ -498,4 +501,40 @@ impl RestApiClient {
.await?;
Ok(response.into_body())
}

/// Expand volume.
#[instrument(fields(volume.uuid = %volume_id, volume.size = %required_volume_size), skip(self, volume_id, required_volume_size))]
pub(crate) async fn expand_volume(
&self,
volume_id: &uuid::Uuid,
required_volume_size: u64,
) -> Result<Volume, ApiClientError> {
use clients::tower::{Error::Response, ResponseError::Expected};

let vol_client = self.rest_client.volumes_api();

// This call should be idempotent and should return success if the volume
// size is already greater than or equal to the target_volume_size.
let resize_result = vol_client
.put_volume_size(
volume_id,
ResizeVolumeBody::new(required_volume_size as usize),
)
.await
.map(|response| response.into_body());

match resize_result {
// Success case.
Ok(vol) => Ok(vol),
// The volume capacity is already greater than or equal to required capacity.
Err(Response(Expected(err))) if err.status() == StatusCode::NOT_ACCEPTABLE => {
Ok(vol_client
.get_volume(volume_id)
.await
.map(|response| response.into_body())?)
}
// Something went wrong.
Err(err) => Err(err.into()),
}
}
}
49 changes: 47 additions & 2 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{collections::HashMap, str::FromStr};
use tonic::{Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
use volume_capability::AccessType;

const OPENEBS_TOPOLOGY_KEY: &str = "openebs.io/nodename";
const VOLUME_NAME_PATTERN: &str =
Expand Down Expand Up @@ -790,6 +791,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
controller_service_capability::rpc::Type::GetCapacity,
controller_service_capability::rpc::Type::CreateDeleteSnapshot,
controller_service_capability::rpc::Type::ListSnapshots,
controller_service_capability::rpc::Type::ExpandVolume,
];

Ok(Response::new(ControllerGetCapabilitiesResponse {
Expand Down Expand Up @@ -976,9 +978,52 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
#[instrument(err, skip(self))]
async fn controller_expand_volume(
&self,
_request: tonic::Request<ControllerExpandVolumeRequest>,
request: tonic::Request<ControllerExpandVolumeRequest>,
) -> Result<tonic::Response<ControllerExpandVolumeResponse>, tonic::Status> {
Err(Status::unimplemented("Not implemented"))
let request = request.into_inner();

let vol_uuid = Uuid::parse_str(&request.volume_id).map_err(|error| {
Status::invalid_argument(format!(
"Malformed volume UUID '{}': {error}",
request.volume_id
))
})?;

let requested_size = request
.capacity_range
.as_ref()
.ok_or(Status::invalid_argument(format!(
"Cannot expand volume '{}': invalid request {request:?}: missing CapacityRange",
request.volume_id
)))?
.required_bytes;

// NodeExpandVolume should be avoided only if we know without a shred of doubt that the
// volume is that of a Block type. The volume_capability field is optional and we should
// assume NodeExpandVolume is required if it is not clearly defined that the volume is
// Block type volume.
let node_expansion_required = !matches!(
request.volume_capability.as_ref(),
Some(vc) if matches!(vc.access_type, Some(AccessType::Block(_)))
);

let _guard = csi_driver::limiter::VolumeOpGuard::new(vol_uuid)?;

let vol = RestApiClient::get_client()
// This call is idempotent and should return success without expansion,
// if the volume size is larger than or equal to the requested size.
.expand_volume(&vol_uuid, requested_size as u64)
.await
.map_err(|error| match error {
ApiClientError::PreconditionFailed(msg) => Status::failed_precondition(msg),
ApiClientError::ResourceExhausted(msg) => Status::out_of_range(msg),
error => Status::from(error),
})?;

Ok(tonic::Response::new(ControllerExpandVolumeResponse {
capacity_bytes: vol.spec.size as i64,
node_expansion_required,
}))
}

#[instrument(err, skip(self))]
Expand Down
25 changes: 8 additions & 17 deletions control-plane/csi-driver/src/bin/controller/identity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{ApiClientError, RestApiClient};
use csi_driver::CSI_PLUGIN_NAME;
use rpc::csi::*;
use csi_driver::{plugin_capabilities::plugin_capabilities, CSI_PLUGIN_NAME};
use rpc::csi::{
GetPluginCapabilitiesRequest, GetPluginCapabilitiesResponse, GetPluginInfoRequest,
GetPluginInfoResponse, ProbeRequest, ProbeResponse,
};

use std::collections::HashMap;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -33,24 +36,12 @@ impl rpc::csi::identity_server::Identity for CsiIdentitySvc {
#[instrument]
async fn get_plugin_capabilities(
&self,
_request: tonic::Request<GetPluginCapabilitiesRequest>,
request: tonic::Request<GetPluginCapabilitiesRequest>,
) -> Result<Response<GetPluginCapabilitiesResponse>, Status> {
debug!("Request to get CSI plugin capabilities");

let capabilities = vec![
plugin_capability::service::Type::ControllerService,
plugin_capability::service::Type::VolumeAccessibilityConstraints,
];
debug!("GetPluginCapabilities request: {:?}", request);

Ok(Response::new(GetPluginCapabilitiesResponse {
capabilities: capabilities
.into_iter()
.map(|c| PluginCapability {
r#type: Some(plugin_capability::Type::Service(
plugin_capability::Service { r#type: c as i32 },
)),
})
.collect(),
capabilities: plugin_capabilities(),
}))
}

Expand Down
21 changes: 19 additions & 2 deletions control-plane/csi-driver/src/bin/node/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
//! }
//! ```

use std::{collections::HashMap, convert::TryFrom, time::Duration};

use std::{
collections::HashMap,
convert::TryFrom,
path::{Path, PathBuf},
time::Duration,
};
use tokio::time::sleep;
use udev::Enumerator;
use url::Url;
Expand Down Expand Up @@ -136,3 +140,16 @@ impl Device {
Err(DeviceError::new("device attach timeout"))
}
}

/// Get the block device capacity size from the sysfs block count.
pub(crate) fn get_size_from_dev_name<N: AsRef<Path>>(dev_name: N) -> Result<usize, DeviceError> {
// Linux sectors are 512 byte long. This is fixed.
// Ref: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/types.h?id=v5.15#n117
const LINUX_SECTOR_SIZE: usize = 512;

let sysfs_dir = PathBuf::from("/sys/class/block").join(dev_name);

let size: usize = sysfs::parse_value(sysfs_dir.as_path(), "size")?;

Ok(size * LINUX_SECTOR_SIZE)
}
3 changes: 1 addition & 2 deletions control-plane/csi-driver/src/bin/node/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Definition of DeviceError used by the attach and detach code.
use nvmeadm::{error::NvmeError, nvmf_discovery};

use nix::errno::Errno;
use nvmeadm::{error::NvmeError, nvmf_discovery};
use snafu::Snafu;
use std::{process::ExitCode, string::FromUtf8Error};

Expand Down
91 changes: 88 additions & 3 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
//! specific filesystem, repairing of the filesystem, retrieving specific properties of the
//! filesystem.

use crate::mount;
use crate::{findmnt::get_devicepath, mount};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::{blkid::probe::Probe, DevInfoError};
use std::process::Output;

use anyhow::anyhow;
use devinfo::{blkid::probe::Probe, mountinfo::MountInfo, DevInfoError};
use std::{process::Output, str, str::FromStr};
use tokio::process::Command;
use tonic::async_trait;
use tracing::{debug, trace};
Expand Down Expand Up @@ -35,6 +37,23 @@ impl From<Fs> for FileSystem {
}
}

impl TryFrom<&MountInfo> for FileSystem {
type Error = anyhow::Error;

fn try_from(mnt: &MountInfo) -> Result<Self, Self::Error> {
if mnt.fstype.is_empty() {
return Err(anyhow!("fstype is empty"));
}

Fs::from_str(mnt.fstype.to_lowercase().as_str())
.map_err(|err| anyhow!("failed to parse FileSystem: {err}"))
.and_then(|fs_type| match fs_type {
Fs::Ext4 | Fs::Xfs | Fs::Btrfs => Ok(FileSystem::from(fs_type)),
_ => Err(anyhow!("Unsupported filesystem")),
})
}
}

impl AsRef<str> for FileSystem {
fn as_ref(&self) -> &str {
self.0.as_ref()
Expand Down Expand Up @@ -107,6 +126,8 @@ pub(crate) trait FileSystemOps: Send + Sync {
}
Ok(())
}
/// Write the existing filesystem on to new unused blocks on the block device.
async fn expand(&self, mount_path: &str) -> Result<(), Error>;
}

#[async_trait]
Expand Down Expand Up @@ -204,6 +225,22 @@ impl FileSystemOps for Ext4Fs {
debug!("Changed filesystem uuid to {volume_uuid} for {device}");
Ok(())
}

async fn expand(&self, mount_path: &str) -> Result<(), Error> {
let dev_path = get_devicepath(mount_path)
.map_err(|err| {
format!(
"failed to get dev path for mountpoint {}: {}",
mount_path, err
)
})?
.ok_or(format!(
"no underlying device found for mountpoint {}",
mount_path
))?;

run_fs_expand_command(vec!["resize2fs", dev_path.as_str()]).await
}
}

#[async_trait]
Expand Down Expand Up @@ -292,6 +329,10 @@ impl FileSystemOps for XFs {
debug!("Changed filesystem uuid to {volume_uuid} for {device}");
Ok(())
}

async fn expand(&self, mount_path: &str) -> Result<(), Error> {
run_fs_expand_command(vec!["xfs_growfs", mount_path]).await
}
}

#[async_trait]
Expand Down Expand Up @@ -390,6 +431,10 @@ impl FileSystemOps for BtrFs {
debug!("Changed filesystem uuid to {volume_uuid} for {device}");
Ok(())
}

async fn expand(&self, mount_path: &str) -> Result<(), Error> {
run_fs_expand_command(vec!["btrfs", "filesystem", "resize", "max", mount_path]).await
}
}

// Acknowledge the output from Command.
Expand All @@ -411,3 +456,43 @@ fn ack_command_output(output: Output, binary: String) -> Result<(), Error> {
String::from_utf8(output.stderr).unwrap()
))
}

/// Run the command and return an error if the execution fails, or the command fails.
pub(crate) async fn run_fs_expand_command(cmd_and_args: Vec<&str>) -> Result<(), String> {
match cmd_and_args.len() {
0 => return Err("resize command cannot be empty".to_string()),
1 => {
return Err(
"cannot work with a resize command which doesn't take input arguments".to_string(),
)
}
_ => {}
};

let cmd = cmd_and_args[0];
let args = cmd_and_args[1 ..].to_vec();

let out = Command::new(cmd)
.args(args.as_slice())
.output()
.await
.map_err(|exec_err| {
format!(
"failed to execute resize command,\ncmd: {},\nargs: {:?}: {}",
cmd, args, exec_err
)
})?;

match out.status.success() {
true => Ok(()),
false => Err(format!(
"failed resize,\ncmd: {:?},\nargs: {:?},\nstderr: {}",
cmd,
args,
str::from_utf8(out.stderr.as_slice()).map_err(|error| format!(
"failed to convert stderr bytes-slice to str: {}",
error
))?
)),
}
}
Loading

0 comments on commit 3800c10

Please sign in to comment.