Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lock): make import pool, create replica and share replica operations mutually exclusive(cherry-pick PR) #1628

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {

// Initialize Lock manager.
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::NEXUS, 512);
.with_subsystem(ProtectedSubsystems::POOL, 32)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

Mthread::spawn_unaffinitized(move || {
Expand Down
22 changes: 17 additions & 5 deletions io-engine/src/core/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use once_cell::sync::OnceCell;
pub struct ProtectedSubsystems;
impl ProtectedSubsystems {
pub const NEXUS: &'static str = "nexus";
pub const POOL: &'static str = "pool";
pub const REPLICA: &'static str = "replica";
}

/// Configuration parameters for initialization of the Lock manager.
Expand Down Expand Up @@ -41,6 +43,7 @@ impl ResourceLockManagerConfig {
}

/// Resource subsystem that holds locks for all resources withing this system.
#[derive(Debug)]
pub struct ResourceSubsystem {
id: String,
object_locks: Vec<Mutex<LockStats>>,
Expand All @@ -67,22 +70,23 @@ impl ResourceSubsystem {
pub async fn lock(
&self,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.subsystem_lock, wait_timeout).await
acquire_lock(&self.subsystem_lock, wait_timeout, try_lock).await
}

/// Lock subsystem resource by its ID and obtain a lock guard.
pub async fn lock_resource<T: AsRef<str>>(
&self,
id: T,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
// Calculate hash of the object to get the mutex index.
let mut hasher = DefaultHasher::new();
id.as_ref().hash(&mut hasher);
let mutex_id = hasher.finish() as usize % self.object_locks.len();

acquire_lock(&self.object_locks[mutex_id], wait_timeout).await
acquire_lock(&self.object_locks[mutex_id], wait_timeout, try_lock).await
}
}

Expand Down Expand Up @@ -122,14 +126,21 @@ static LOCK_MANAGER: OnceCell<ResourceLockManager> = OnceCell::new();
async fn acquire_lock(
lock: &Mutex<LockStats>,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
let mut lock_guard = if let Some(d) = wait_timeout {
match tokio::time::timeout(d, lock.lock()).await {
Err(_) => return None,
Ok(g) => g,
}
} else if try_lock {
// No timeout, try for taking lock immediately.
match lock.try_lock() {
Some(l) => l,
None => return None,
}
} else {
// No timeout, wait for the lock indefinitely.
// No timeout, wait indefinitely.
lock.lock().await
};

Expand Down Expand Up @@ -162,8 +173,9 @@ impl ResourceLockManager {
pub async fn lock(
&self,
wait_timeout: Option<Duration>,
try_lock: bool,
) -> Option<ResourceLockGuard<'_>> {
acquire_lock(&self.mgr_lock, wait_timeout).await
acquire_lock(&self.mgr_lock, wait_timeout, try_lock).await
}

/// Get resource subsystem by its id.
Expand Down
38 changes: 33 additions & 5 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
};

use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
bdev_api::BdevError,
core::{CoreError, Reactor, VerboseError},
core::{
CoreError,
Reactor,
ResourceLockGuard,
ResourceSubsystem,
VerboseError,
},
};

impl From<BdevError> for tonic::Status {
Expand Down Expand Up @@ -168,6 +173,29 @@ where
.map_err(|_| Status::resource_exhausted("ENOMEM"))
}

/// Manage locks across multiple grpc services.
pub async fn acquire_subsystem_lock<'a>(
subsystem: &'a ResourceSubsystem,
resource: Option<&str>,
) -> Result<ResourceLockGuard<'a>, Status> {
if let Some(resource) = resource {
match subsystem.lock_resource(resource.to_string(), None, true).await {
Some(lock_guard) => Ok(lock_guard),
None => Err(Status::already_exists(format!(
"Failed to acquire lock for the resource: {resource}, lock already held"
))),
}
} else {
match subsystem.lock(None, true).await {
Some(lock_guard) => Ok(lock_guard),
None => Err(Status::already_exists(format!(
"Failed to acquire subsystem lock: {:?}, lock already held",
subsystem
))),
}
}
}

macro_rules! default_ip {
() => {
"0.0.0.0"
Expand Down
7 changes: 5 additions & 2 deletions io-engine/src/grpc/v0/mayastor_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl MayastorSvc {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -169,7 +169,7 @@ impl MayastorSvc {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down Expand Up @@ -302,6 +302,9 @@ impl From<LvsError> for tonic::Status {
LvsError::WipeFailed {
source,
} => source.into(),
LvsError::ResourceLockFailed {
..
} => Status::aborted(e.to_string()),
_ => Status::internal(e.verbose()),
}
}
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl NexusService {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -95,7 +95,7 @@ impl NexusService {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down
29 changes: 27 additions & 2 deletions io-engine/src/grpc/v1/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use crate::{
core::Share,
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
Share,
},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvs},
pool_backend::{PoolArgs, PoolBackend},
};
Expand Down Expand Up @@ -314,6 +324,21 @@ impl PoolRpc for PoolService {
let args = request.into_inner();
info!("{:?}", args);
let rx = rpc_submit::<_, _, LvsError>(async move {
let pool_subsystem = ResourceLockManager::get_instance()
.get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(&args.name),
)
.await
.map_err(|_| {
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for disk pool {:?}",
&args.name, &args.disks,
),
}
})?;
let pool = Lvs::import_from_args(PoolArgs::try_from(args)?)
.await?;
Ok(Pool::from(pool))
Expand Down
38 changes: 36 additions & 2 deletions io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
bdev::PtplFileOps,
bdev_api::BdevError,
core::{
lock::{ProtectedSubsystems, ResourceLockManager},
logical_volume::LogicalVolume,
Bdev,
CloneXattrs,
Expand All @@ -11,7 +12,14 @@ use crate::{
UntypedBdev,
UpdateProps,
},
grpc::{rpc_submit, GrpcClientContext, GrpcResult, RWLock, RWSerializer},
grpc::{
acquire_subsystem_lock,
rpc_submit,
GrpcClientContext,
GrpcResult,
RWLock,
RWSerializer,
},
lvs::{Error as LvsError, Lvol, LvolSpaceUsage, Lvs, LvsLvol, PropValue},
};
use ::function_name::named;
Expand Down Expand Up @@ -219,6 +227,20 @@ impl ReplicaRpc for ReplicaService {
}
}
};
let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem, Some(lvs.name())
)
.await
.map_err(|_|
LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for pooluuid {}",
lvs.name(),
args.pooluuid
)
}
)?;
// if pooltype is not Lvs, the provided replica uuid need to be added as
match lvs.create_lvol(&args.name, args.size, Some(&args.uuid), args.thin, args.entity_id).await {
Ok(mut lvol)
Expand Down Expand Up @@ -401,7 +423,19 @@ impl ReplicaRpc for ReplicaService {
match Bdev::lookup_by_uuid_str(&args.uuid) {
Some(bdev) => {
let mut lvol = Lvol::try_from(bdev)?;

let pool_subsystem = ResourceLockManager::get_instance().get_subsystem(ProtectedSubsystems::POOL);
let _lock_guard = acquire_subsystem_lock(
pool_subsystem,
Some(lvol.lvs().name()),
)
.await
.map_err(|_| LvsError::ResourceLockFailed {
msg: format!(
"resource {}, for lvol {:?}",
lvol.lvs().name(),
lvol
),
})?;
// if we are already shared with the same protocol
if lvol.shared()
== Some(Protocol::try_from(args.share)?)
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl SnapshotService {
match tokio::spawn(async move {
// Grab global operation lock, if requested.
let _global_guard = if global_operation {
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout"
Expand All @@ -255,7 +255,7 @@ impl SnapshotService {
// Grab per-object lock before executing the future.
let _resource_guard = match lock_manager
.get_subsystem(ProtectedSubsystems::NEXUS)
.lock_resource(nexus_uuid, Some(ctx.timeout))
.lock_resource(nexus_uuid, Some(ctx.timeout), false)
.await {
Some(g) => g,
None => return Err(Status::deadline_exceeded(
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/grpc/v1/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
let lock_manager = ResourceLockManager::get_instance();
// For nexus global lock.
let _global_guard =
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout",
Expand Down Expand Up @@ -96,7 +96,7 @@ impl StatsService {
let lock_manager = ResourceLockManager::get_instance();
// For nexus global lock.
let _global_guard =
match lock_manager.lock(Some(ctx.timeout)).await {
match lock_manager.lock(Some(ctx.timeout), false).await {
Some(g) => Some(g),
None => return Err(Status::deadline_exceeded(
"Failed to acquire access to object within given timeout",
Expand Down
7 changes: 7 additions & 0 deletions io-engine/src/lvs/lvs_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub enum Error {
WipeFailed {
source: crate::core::wiper::Error,
},
#[snafu(display("Failed to acquire resource lock, {}", msg))]
ResourceLockFailed {
msg: String,
},
}

/// Map CoreError to errno code.
Expand Down Expand Up @@ -265,6 +269,9 @@ impl ToErrno for Error {
Self::WipeFailed {
..
} => Errno::EINVAL,
Self::ResourceLockFailed {
..
} => Errno::EBUSY,
}
}
}
Expand Down
Loading
Loading