Skip to content

Commit

Permalink
fix(lock): make import pool, create replica and share replica operati…
Browse files Browse the repository at this point in the history
…ons mutually exclusive

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Mar 29, 2024
1 parent bf6450d commit 8172370
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 19 deletions.
4 changes: 3 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {

// Initialize Lock manager.
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
.with_subsystem(ProtectedSubsystems::POOL, 256)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

Mthread::spawn_unaffinitized(move || {
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/core/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell;
pub struct ProtectedSubsystems;
impl ProtectedSubsystems {
pub const NEXUS: &'static str = "nexus";
pub const POOL: &'static str = "pool";
pub const REPLICA: &'static str = "replica";
}

/// Configuration parameters for initialization of the Lock manager.
Expand Down Expand Up @@ -81,7 +83,6 @@ impl ResourceSubsystem {
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();

acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
}
}
Expand Down
54 changes: 49 additions & 5 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
};

use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor, VerboseError},
core::{
lock::ResourceLockManager,
CoreError,
Reactor,
ResourceSubsystem,
VerboseError,
},
};

impl From<BdevError> for tonic::Status {
Expand Down Expand Up @@ -168,6 +173,45 @@ where
.map_err(|_| Status::resource_exhausted("ENOMEM"))
}

/// Manage locks accross multiple grpc services.
pub async fn acquire_subsystem_lock(
subsystem: &ResourceSubsystem,
resource: Option<String>,
global: bool,
) -> Result<(), Status> {
if global {
let lock_manager = ResourceLockManager::get_instance();
match lock_manager.lock(None).await {
Some(_) => (),
None => {
return Err(Status::deadline_exceeded(
"Failed to acquire global lock within given timeout"
.to_string(),
))
}
}
} else if let Some(resource) = resource {
match subsystem.lock_resource(resource, None).await {
Some(_) => (),
None => return Err(Status::deadline_exceeded(
"Failed to acquire lock for the resource within given timeout"
.to_string(),
)),
}
} else {
match subsystem.lock(None).await {
Some(_) => (),
None => {
return Err(Status::deadline_exceeded(
"Failed to acquire subsystem lock within given timeout"
.to_string(),
))
}
}
}
Ok(())
}

macro_rules! default_ip {
() => {
"0.0.0.0"
Expand Down
35 changes: 33 additions & 2 deletions io-engine/src/grpc/v1/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use crate::{
core::Share,
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
Share,
},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvs},
pool_backend::{PoolArgs, PoolBackend},
};
Expand Down Expand Up @@ -314,6 +324,27 @@ impl PoolRpc for PoolService {
let args = request.into_inner();
info!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
match acquire_subsystem_lock(
pool_subsystem,
Some(args.name.to_string()),
false,
)
.await
{
Ok(_) => {}
Err(_) => {
return Err(LvsError::ResourceLockFailed {
source: Errno::EBUSY,
msg: format!(
"resource {}, for disk pool {:?}",
args.name.to_string(),
args.disks,
),
})
}
}
let pool = Lvs::import_from_args(PoolArgs::try_from(args)?)
.await?;
Ok(Pool::from(pool))
Expand Down
63 changes: 53 additions & 10 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
bdev::PtplFileOps,
bdev_api::BdevError,
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
logical_volume::LogicalVolume,
Bdev,
CloneXattrs,
Expand All @@ -11,7 +12,14 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue},
};
use ::function_name::named;
Expand Down Expand Up @@ -191,7 +199,6 @@ impl ReplicaRpc for ReplicaService {
request: Request<CreateReplicaRequest>,
) -> GrpcResult<Replica> {
self.locked(GrpcClientContext::new(&request, function_name!()), async move {

let args = request.into_inner();
info!("{:?}", args);
if !matches!(
Expand Down Expand Up @@ -219,6 +226,16 @@ impl ReplicaRpc for ReplicaService {
}
}
};
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
match acquire_subsystem_lock(pool_subsystem, Some(lvs.name().to_string()), false).await {
Ok(_) => {}
Err(_) => {
return Err(LvsError::ResourceLockFailed {
source: Errno::EBUSY,
msg: format!("resource {}, for pooluuid {}", lvs.name().to_string(), args.pooluuid),
})
}
}
// if pooltype is not Lvs, the provided replica uuid need to be added as
match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await {
Ok(mut lvol)
Expand Down Expand Up @@ -401,7 +418,28 @@ impl ReplicaRpc for ReplicaService {
match Bdev::lookup_by_uuid_str(&args.uuid) {
Some(bdev) => {
let mut lvol = Lvol::try_from(bdev)?;

let pool_subsystem =
ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
match acquire_subsystem_lock(
pool_subsystem,
Some(lvol.lvs().name().to_string()),
false,
)
.await
{
Ok(_) => {}
Err(_) => {
return Err(LvsError::ResourceLockFailed {
source: Errno::EBUSY,
msg: format!(
"resource {}, for lvol {:?}",
lvol.lvs().name().to_string(),
lvol
),
})
}
}
// if we are already shared with the same protocol
if lvol.shared()
== Some(Protocol::try_from(args.share)?)
Expand All @@ -427,14 +465,19 @@ impl ReplicaRpc for ReplicaService {
Protocol::Nvmf => {
let props = ShareProps::new()
.with_allowed_hosts(args.allowed_hosts)
.with_ptpl(lvol.ptpl().create().map_err(
|source| LvsError::LvolShare {
source: crate::core::CoreError::Ptpl {
.with_ptpl(
lvol.ptpl().create().map_err(
|source| {
LvsError::LvolShare {
source:
crate::core::CoreError::Ptpl {
reason: source.to_string(),
},
name: lvol.name(),
},
)?);
name: lvol.name(),
}
},
)?,
);
Pin::new(&mut lvol)
.share_nvmf(Some(props))
.await?;
Expand All @@ -459,7 +502,7 @@ impl ReplicaRpc for ReplicaService {
.map(Response::new)
},
)
.await
.await
}

#[named]
Expand Down
8 changes: 8 additions & 0 deletions io-engine/src/lvs/lvs_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub enum Error {
WipeFailed {
source: crate::core::wiper::Error,
},
#[snafu(display("Failed to acquire resource lock, {}", msg))]
ResourceLockFailed {
source: Errno,
msg: String,
},
}

/// Map CoreError to errno code.
Expand Down Expand Up @@ -265,6 +270,9 @@ impl ToErrno for Error {
Self::WipeFailed {
..
} => Errno::EINVAL,
Self::ResourceLockFailed {
source, ..
} => source,
}
}
}
Expand Down

0 comments on commit 8172370

Please sign in to comment.