diff --git a/Cargo.lock b/Cargo.lock index 876f86704..f59ec591e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1705,6 +1705,7 @@ dependencies = [ "colored_json", "composer", "crossbeam", + "derive_builder", "etcd-client", "function_name", "futures", diff --git a/io-engine-tests/Cargo.toml b/io-engine-tests/Cargo.toml index 216ed062e..67f21ebbc 100644 --- a/io-engine-tests/Cargo.toml +++ b/io-engine-tests/Cargo.toml @@ -15,6 +15,7 @@ bytes = "1.5.0" chrono = "0.4.31" colored_json = "4.0.0" crossbeam = "0.8.2" +derive_builder = "0.12.0" etcd-client = "0.12.1" function_name = "0.3.0" futures = "0.3.28" diff --git a/io-engine-tests/src/file_io.rs b/io-engine-tests/src/file_io.rs index daf8a6950..871064064 100644 --- a/io-engine-tests/src/file_io.rs +++ b/io-engine-tests/src/file_io.rs @@ -1,6 +1,7 @@ use once_cell::sync::OnceCell; use rand::{distributions::Uniform, Rng, SeedableRng}; use rand_chacha::ChaCha8Rng; +use serde::Serialize; use std::{ fmt::{Display, Formatter}, io::SeekFrom, @@ -26,7 +27,8 @@ fn create_test_buf(buf_size: DataSize) -> Vec { } /// TODO -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] +#[serde(into = "u64")] pub struct DataSize(u64); impl Display for DataSize { @@ -47,6 +49,12 @@ impl From for usize { } } +impl From for DataSize { + fn from(value: u64) -> Self { + Self::from_bytes(value) + } +} + impl Default for DataSize { fn default() -> Self { Self::from_bytes(0) diff --git a/io-engine-tests/src/fio.rs b/io-engine-tests/src/fio.rs index 329aecf7e..f31e4323c 100644 --- a/io-engine-tests/src/fio.rs +++ b/io-engine-tests/src/fio.rs @@ -1,11 +1,14 @@ -use super::file_io::DataSize; +use derive_builder::Builder; use nix::errno::Errno; +use serde::Serialize; use std::{ path::Path, sync::atomic::{AtomicU32, Ordering}, time::{Duration, Instant}, }; +use super::file_io::DataSize; + /// TODO #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum FioJobResult { @@ -14,38 +17,92 @@ pub enum FioJobResult { Error(Errno), } -/// TODO -#[derive(Debug, Clone)] +/// FIO job. +/// Non-optional fields are always passed as FIO CLI arguments. +/// Optionals fields are passed to FIO only if they are defined. +#[derive(Debug, Clone, Builder, Serialize)] +#[builder(setter(prefix = "with"))] +#[builder(build_fn(name = "try_build"))] +#[builder(default)] #[allow(dead_code)] pub struct FioJob { - /// Job counter. - pub counter: u32, /// Job name. + #[builder(setter(into))] pub name: String, /// I/O engine to use. Default: spdk. + #[builder(setter(into))] pub ioengine: String, /// Filename. + #[builder(setter(custom))] pub filename: String, /// Type of I/O pattern. + #[builder(setter(into))] pub rw: String, /// If true, use non-buffered I/O (usually O_DIRECT). Default: true. pub direct: bool, /// Block size for I/O units. Default: 4k. - pub blocksize: Option, + #[builder(setter(strip_option, into))] + pub bs: Option, /// Offset in the file to start I/O. Data before the offset will not be /// touched. + #[builder(setter(strip_option, into))] pub offset: Option, /// Number of I/O units to keep in flight against the file. + #[builder(setter(strip_option))] pub iodepth: Option, /// Number of clones (processes/threads performing the same workload) of /// this job. Default: 1. pub numjobs: u32, + /// TODO + pub thread: u32, /// Terminate processing after the specified number of seconds. + /// If this field is defined, --timebased=1 is set as well. + #[builder(setter(strip_option))] pub runtime: Option, /// Total size of I/O for this job. + #[builder(setter(strip_option, into))] pub size: Option, + /// TODO + pub norandommap: bool, + /// TODO + #[builder(setter(into))] + pub random_generator: Option, + /// TODO + #[builder(setter(strip_option))] + pub do_verify: Option, + /// TODO + #[builder(setter(strip_option, into))] + pub verify: Option, + /// TODO + #[builder(setter(strip_option))] + pub verify_async: Option, + /// TODO + #[builder(setter(strip_option))] + pub verify_fatal: Option, /// Run result. + #[builder(setter(skip))] + #[serde(skip_serializing)] pub result: FioJobResult, + /// Job counter. + #[builder(setter(skip))] + #[serde(skip_serializing)] + counter: u32, +} + +impl FioJobBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn build(&self) -> FioJob { + self.try_build() + .expect("FIO job builder is expected to succeed") + } + + pub fn with_filename(&mut self, v: impl AsRef) -> &mut Self { + self.filename = Some(v.as_ref().to_str().unwrap().to_string()); + self + } } impl Default for FioJob { @@ -61,172 +118,123 @@ impl FioJob { let counter = JOB_COUNTER.fetch_add(1, Ordering::SeqCst); Self { - counter, name: format!("fio-{counter}"), ioengine: "spdk".to_string(), filename: String::new(), rw: "write".to_string(), direct: true, - blocksize: None, + bs: None, offset: None, iodepth: None, numjobs: 1, + thread: 1, runtime: None, size: None, + norandommap: true, + random_generator: Some("tausworthe64".to_string()), + do_verify: None, + verify: None, + verify_async: None, + verify_fatal: None, result: FioJobResult::NotRun, + counter, } } pub fn as_fio_args(&self) -> Vec { assert!(!self.filename.is_empty(), "Filename must be defined"); - let mut r = vec![ - format!("--name={}", self.name), - format!("--ioengine={}", self.ioengine), - format!("--filename={}", self.filename), - format!("--thread=1"), - format!("--direct={}", if self.direct { "1" } else { "0" }), - format!("--norandommap=1"), - format!("--rw={}", self.rw), - format!("--numjobs={}", self.numjobs), - format!("--random_generator=tausworthe64"), - ]; - - if let Some(v) = self.blocksize { - r.push(format!("--bs={v}")); - } - - if let Some(ref v) = self.offset { - r.push(format!("--offset={v}")); - } - - if let Some(v) = self.iodepth { - r.push(format!("--iodepth={v}")); - } + let mut r: Vec = serde_json::to_value(self) + .unwrap() + .as_object() + .unwrap() + .into_iter() + .filter_map(|(k, v)| { + if v.is_null() { + None + } else if v.is_string() { + // Serde adds quotes around strings, we don't want them. + Some(format!("--{k}={v}", v = v.as_str().unwrap())) + } else if v.is_boolean() { + // Map booleans to 1 or 0. + Some(format!( + "--{k}={v}", + v = if v.as_bool().unwrap() { 1 } else { 0 } + )) + } else { + Some(format!("--{k}={v}")) + } + }) + .collect(); - if let Some(v) = self.runtime { + if self.runtime.is_some() { r.push("--time_based=1".to_string()); - r.push(format!("--runtime={v}s")); - } - - if let Some(ref v) = self.size { - r.push(format!("--size={v}")); } r } - - /// Sets job name. - pub fn with_name(mut self, v: &str) -> Self { - self.name = v.to_string(); - self - } - - /// I/O engine to use. Default: spdk. - pub fn with_ioengine(mut self, v: &str) -> Self { - self.ioengine = v.to_string(); - self - } - - /// Filename. - pub fn with_filename(mut self, v: &str) -> Self { - self.filename = v.to_string(); - self - } - - /// Filename. - pub fn with_filename_path(mut self, v: impl AsRef) -> Self { - self.filename = v.as_ref().to_str().unwrap().to_string(); - self - } - - /// Read-write FIO mode. - pub fn with_rw(mut self, rw: &str) -> Self { - self.rw = rw.to_string(); - self - } - - /// If true, use non-buffered I/O (usually O_DIRECT). Default: true. - pub fn with_direct(mut self, v: bool) -> Self { - self.direct = v; - self - } - - /// Block size for I/O units. Default: 4k. - pub fn with_bs(mut self, v: u32) -> Self { - self.blocksize = Some(v); - self - } - - /// Offset in the file to start I/O. Data before the offset will not be - /// touched. - pub fn with_offset(mut self, v: DataSize) -> Self { - self.offset = Some(v); - self - } - - /// Number of I/O units to keep in flight against the file. - pub fn with_iodepth(mut self, v: u32) -> Self { - self.iodepth = Some(v); - self - } - - /// Number of clones (processes/threads performing the same workload) of - /// this job. Default: 1. - pub fn with_numjobs(mut self, v: u32) -> Self { - self.numjobs = v; - self - } - - /// Terminate processing after the specified number of seconds. - pub fn with_runtime(mut self, v: u32) -> Self { - self.runtime = Some(v); - self - } - - /// Total size of I/O for this job. - pub fn with_size(mut self, v: DataSize) -> Self { - self.size = Some(v); - self - } } /// TODO -#[derive(Default, Debug, Clone)] +#[derive(Default, Debug, Clone, Builder)] +#[builder(setter(prefix = "with", into))] +#[builder(build_fn(name = "try_build"))] +#[builder(default)] #[allow(dead_code)] pub struct Fio { + /// TODO + #[builder(setter(custom))] pub jobs: Vec, + /// TODO pub verbose: bool, + /// TODO pub verbose_err: bool, + /// TODO + #[builder(setter(skip))] pub script: String, + /// TODO + #[builder(setter(skip))] pub total_time: Duration, + /// TODO + #[builder(setter(skip))] pub exit: i32, + /// TODO + #[builder(setter(skip))] pub err_messages: Vec, } -impl Fio { +impl FioBuilder { pub fn new() -> Self { Default::default() } - pub fn with_jobs(mut self, jobs: impl Iterator) -> Self { - jobs.for_each(|j| self.jobs.push(j)); - self + pub fn build(&self) -> Fio { + self.try_build() + .expect("FIO builder is expected to succeed") } - pub fn with_job(mut self, job: FioJob) -> Self { - self.jobs.push(job); + pub fn with_jobs( + &mut self, + jobs: impl Iterator, + ) -> &mut Self { + jobs.for_each(|j| { + self.with_job(j); + }); self } - pub fn with_verbose(mut self, v: bool) -> Self { - self.verbose = v; + pub fn with_job(&mut self, job: FioJob) -> &mut Self { + if self.jobs.is_none() { + self.jobs = Some(Vec::new()); + } + self.jobs.as_mut().unwrap().push(job); self } +} - pub fn with_verbose_err(mut self, v: bool) -> Self { - self.verbose_err = v; - self +impl Fio { + pub fn new() -> Self { + Default::default() } pub fn run(mut self) -> Self { @@ -262,10 +270,14 @@ impl Fio { } if self.verbose_err { - println!( - "Error(s) running FIO: {s}", - s = self.err_messages.join("\n") - ); + if self.err_messages.is_empty() { + println!("FIO is okay"); + } else { + println!( + "Error(s) running FIO: {s}", + s = self.err_messages.join("\n") + ); + } } self diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 3d14aa931..8b1565e1f 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -536,6 +536,14 @@ pub async fn list_nexuses(rpc: SharedRpcHandle) -> Result, Status> { .map(|r| r.into_inner().nexus_list) } +/// TODO +pub async fn find_nexus(rpc: SharedRpcHandle, uuid: &str) -> Option { + match list_nexuses(rpc).await { + Err(_) => None, + Ok(nn) => nn.into_iter().find(|p| p.uuid == uuid), + } +} + /// TODO pub async fn find_nexus_by_uuid( rpc: SharedRpcHandle, diff --git a/io-engine-tests/src/nvmf.rs b/io-engine-tests/src/nvmf.rs index 6e1f90ac6..4c9375ae4 100644 --- a/io-engine-tests/src/nvmf.rs +++ b/io-engine-tests/src/nvmf.rs @@ -79,15 +79,11 @@ pub async fn test_fio_to_nvmf( ) -> std::io::Result<()> { let tgt = format!("'{}'", nvmf.as_args().join(" ")); - fio.jobs = fio - .jobs - .into_iter() - .map(|j| { - j.with_filename(&tgt) - .with_ioengine("spdk") - .with_direct(true) - }) - .collect(); + fio.jobs.iter_mut().for_each(|j| { + j.filename = tgt.clone(); + j.ioengine = "spdk".to_string(); + j.direct = true; + }); spawn_fio_task(&fio).await } @@ -101,15 +97,11 @@ pub async fn test_fio_to_nvmf_aio( let path = find_mayastor_nvme_device_path(&nvmf.serial)?; let path_str = path.to_str().unwrap(); - fio.jobs = fio - .jobs - .into_iter() - .map(|j| { - j.with_filename(path_str) - .with_ioengine("libaio") - .with_direct(true) - }) - .collect(); + fio.jobs.iter_mut().for_each(|j| { + j.filename = path_str.to_string(); + j.ioengine = "libaio".to_string(); + j.direct = true; + }); spawn_fio_task(&fio).await } diff --git a/io-engine/src/bdev/nexus/mod.rs b/io-engine/src/bdev/nexus/mod.rs index c2e872798..6d97bcb52 100644 --- a/io-engine/src/bdev/nexus/mod.rs +++ b/io-engine/src/bdev/nexus/mod.rs @@ -191,6 +191,9 @@ pub static ENABLE_PARTIAL_REBUILD: AtomicBool = AtomicBool::new(true); /// Enables/disables nexus reset logic. pub static ENABLE_NEXUS_RESET: AtomicBool = AtomicBool::new(false); +/// Enables/disables additional nexus I/O channel debugging. +pub static ENABLE_NEXUS_CHANNEL_DEBUG: AtomicBool = AtomicBool::new(false); + /// Whether the nexus channel should have readers/writers configured. /// This must be set true ONLY from tests. pub static ENABLE_IO_ALL_THRD_NX_CHAN: AtomicBool = AtomicBool::new(false); diff --git a/io-engine/src/bdev/nexus/nexus_channel.rs b/io-engine/src/bdev/nexus/nexus_channel.rs index 4f63ec87c..522f96130 100644 --- a/io-engine/src/bdev/nexus/nexus_channel.rs +++ b/io-engine/src/bdev/nexus/nexus_channel.rs @@ -32,7 +32,8 @@ impl<'n> Debug for NexusChannel<'n> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "I/O chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]", + "{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]", + io = if self.is_io_chan { "I/O" } else { "Aux" }, nex = self.nexus.nexus_name(), core = self.core, cur = Cores::current(), @@ -76,37 +77,23 @@ impl Display for DrEvent { } } +#[inline(always)] +fn is_channel_debug_enabled() -> bool { + super::ENABLE_NEXUS_CHANNEL_DEBUG.load(Ordering::SeqCst) +} + impl<'n> NexusChannel<'n> { - /// TODO + /// Creates a new nexus I/O channel. pub(crate) fn new(nexus: Pin<&mut Nexus<'n>>) -> Self { debug!("{nexus:?}: new channel on core {c}", c = Cores::current()); let b_init_thrd_hdls = super::ENABLE_IO_ALL_THRD_NX_CHAN.load(Ordering::SeqCst); + let is_io_chan = Thread::current().unwrap() != Thread::primary() || b_init_thrd_hdls; - let mut writers = Vec::new(); - let mut readers = Vec::new(); - - if is_io_chan { - nexus.children_iter().filter(|c| c.is_healthy()).for_each( - |c| match (c.get_io_handle(), c.get_io_handle()) { - (Ok(w), Ok(r)) => { - writers.push(w); - readers.push(r); - } - _ => { - c.set_faulted_state(FaultReason::CantOpen); - error!( - "Failed to get I/O handle for {c}, \ - skipping block device", - c = c.uri() - ) - } - }, - ); - } else { + if !is_io_chan { // If we are here, this means the nexus channel being created is not // the one to be used for normal IOs. Such a channel is // created in rebuild path today, and it's on the init @@ -118,12 +105,16 @@ impl<'n> NexusChannel<'n> { // And the rebuild IOs are dispatched by // directly calling write API without going via writers abstraction. // Refer GTM-1075 for the race condition details. - debug!("{nexus:?}: skip nexus channel setup({t:?}). is_io_channel: {is_io_chan}", t = Thread::current().unwrap()); + debug!( + "{nexus:?}: skipping nexus channel setup on init thread \ + ({t:?}): not I/O channel", + t = Thread::current().unwrap() + ); } - Self { - writers, - readers, + let mut res = Self { + writers: Vec::new(), + readers: Vec::new(), detached: Vec::new(), io_logs: nexus.io_log_channels(), previous_reader: UnsafeCell::new(0), @@ -133,10 +124,19 @@ impl<'n> NexusChannel<'n> { frozen_ios: Vec::new(), core: Cores::current(), is_io_chan, + }; + + res.connect_children(); + + if is_channel_debug_enabled() { + debug!("{res:?}: after new channel creation:"); + res.dump_dbg(); } + + res } - /// TODO + /// Destroys a nexus I/O channel. pub(crate) fn destroy(mut self) { debug!( "{nex:?}: destroying I/O channel on core {core}", @@ -145,6 +145,7 @@ impl<'n> NexusChannel<'n> { ); self.writers.clear(); self.readers.clear(); + self.detached.clear(); self.io_logs.clear(); } @@ -238,6 +239,11 @@ impl<'n> NexusChannel<'n> { } debug!("{self:?}: device '{device_name}' detached"); + + if is_channel_debug_enabled() { + debug!("{self:?}: after detach:"); + self.dump_dbg(); + } } /// Disconnects previously detached device handles by dropping them. @@ -269,11 +275,34 @@ impl<'n> NexusChannel<'n> { /// we simply put back all the channels, and reopen the bdevs that are in /// the online state. pub(crate) fn reconnect_all(&mut self) { + debug!("{self:?}: child devices reconnecting..."); + + if is_channel_debug_enabled() { + debug!("{self:?}: before reconnection:"); + self.dump_dbg(); + } + // clear the vector of channels and reset other internal values, // clearing the values will drop any existing handles in the // channel self.previous_reader = UnsafeCell::new(0); + if self.is_io_channel() { + self.connect_children(); + } + + self.reconnect_io_logs(); + + if is_channel_debug_enabled() { + debug!("{self:?}: after reconnection:"); + self.dump_dbg(); + } + + debug!("{self:?}: child devices reconnected"); + } + + /// (Re)connects readers and writes. + fn connect_children(&mut self) { // nvmx will drop the I/O qpairs which is different from all other // bdevs we might be dealing with. So instead of clearing and refreshing // which had no side effects before, we create a new vector and @@ -290,6 +319,8 @@ impl<'n> NexusChannel<'n> { (Ok(w), Ok(r)) => { writers.push(w); readers.push(r); + + debug!("{self:?}: connecting child device : {c:?}"); } _ => { c.set_faulted_state(FaultReason::CantOpen); @@ -322,10 +353,6 @@ impl<'n> NexusChannel<'n> { self.writers = writers; self.readers = readers; - - self.reconnect_io_logs(); - - debug!("{self:?}: child devices reconnected"); } /// Reconnects all active I/O logs. @@ -394,4 +421,47 @@ impl<'n> NexusChannel<'n> { trace!("{io:?}: freezing I/O"); self.frozen_ios.push(io) } + + /// Prints elaborate debug info to the logs. + fn dump_dbg(&self) { + let me = format!( + "{self:p} [{io} {c}]", + io = if self.is_io_chan { "I/O" } else { "aux" }, + c = self.core, + ); + + debug!("{me}: debug info: {self:?}"); + + debug!("{me}: {n} children:", n = self.nexus().child_count()); + self.nexus().children_iter().for_each(|c| { + debug!( + "{me}: {dev}: {c:?}", + dev = c.get_device_name().unwrap_or("-".to_string()), + ) + }); + + fn dbg_devs( + prefix: &str, + name: &str, + devs: &Vec>, + ) { + if devs.is_empty() { + debug!("{prefix}: no {name}"); + } else { + debug!("{prefix}: {n} {name}:", n = devs.len()); + devs.iter().for_each(|dev| { + debug!( + "{prefix}: {d}", + d = dev.get_device().device_name() + ); + }); + } + } + + dbg_devs(&me, "readers", &self.readers); + dbg_devs(&me, "writers", &self.writers); + dbg_devs(&me, "detached", &self.detached); + + debug!("{me}: (end)"); + } } diff --git a/io-engine/src/bdev/nvmx/device.rs b/io-engine/src/bdev/nvmx/device.rs index 925bf983b..7ff985e01 100644 --- a/io-engine/src/bdev/nvmx/device.rs +++ b/io-engine/src/bdev/nvmx/device.rs @@ -303,7 +303,7 @@ impl DeviceIoController for NvmeDeviceIoController { * Lookup target NVMeOF device by its name (starts with nvmf://). */ pub fn lookup_by_name(name: &str) -> Option> { - debug!("Searching NVMe devices for '{}'...", name); + trace!("Searching NVMe devices for '{}'...", name); if let Some(c) = NVME_CONTROLLERS.lookup_by_name(name) { let controller = c.lock(); // Make sure controller is available. @@ -311,7 +311,7 @@ pub fn lookup_by_name(name: &str) -> Option> { let ns = controller .namespace() .expect("no namespaces for this controller"); - debug!("NVMe device found: '{}'", name); + trace!("NVMe device found: '{}'", name); return Some(Box::new(NvmeBlockDevice::from_ns(name, ns))); } } diff --git a/io-engine/src/bin/io-engine.rs b/io-engine/src/bin/io-engine.rs index 882fe45bf..35dc9b456 100644 --- a/io-engine/src/bin/io-engine.rs +++ b/io-engine/src/bin/io-engine.rs @@ -7,7 +7,11 @@ use futures::future::FutureExt; use io_engine::{ bdev::{ - nexus::{ENABLE_NEXUS_RESET, ENABLE_PARTIAL_REBUILD}, + nexus::{ + ENABLE_NEXUS_CHANNEL_DEBUG, + ENABLE_NEXUS_RESET, + ENABLE_PARTIAL_REBUILD, + }, util::uring, }, core::{ @@ -83,6 +87,11 @@ fn start_tokio_runtime(args: &MayastorCliArgs) { warn!("Nexus reset is disabled"); } + if args.enable_nexus_channel_debug { + ENABLE_NEXUS_CHANNEL_DEBUG.store(true, Ordering::SeqCst); + warn!("Nexus channel debug is enabled"); + } + print_feature!("Async QPair connection", "spdk-async-qpair-connect"); print_feature!("Fault injection", "fault-injection"); diff --git a/io-engine/src/core/env.rs b/io-engine/src/core/env.rs index a72cb9dbe..1bdbc87aa 100644 --- a/io-engine/src/core/env.rs +++ b/io-engine/src/core/env.rs @@ -232,6 +232,13 @@ pub struct MayastorCliArgs { /// Events message-bus endpoint url. #[clap(long)] pub events_url: Option, + /// Enables additional nexus I/O channel debugging. + #[clap( + long = "enable-channel-dbg", + env = "ENABLE_NEXUS_CHANNEL_DEBUG", + hide = true + )] + pub enable_nexus_channel_debug: bool, } /// Mayastor features. @@ -285,6 +292,7 @@ impl Default for MayastorCliArgs { skip_sig_handler: false, enable_io_all_thrd_nexus_channels: false, events_url: None, + enable_nexus_channel_debug: false, } } } diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 2ad7f2f95..f2444e0ee 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -392,9 +392,7 @@ impl crate::core::wiper::NotifyStream for WiperStream { #[cfg(feature = "fault-injection")] impl From for tonic::Status { fn from(e: FaultInjectionError) -> Self { - match e { - e => Status::invalid_argument(e.to_string()), - } + Status::invalid_argument(e.to_string()) } } diff --git a/io-engine/tests/nexus_child_retire.rs b/io-engine/tests/nexus_child_retire.rs index 5f7507298..af744b495 100644 --- a/io-engine/tests/nexus_child_retire.rs +++ b/io-engine/tests/nexus_child_retire.rs @@ -18,7 +18,7 @@ use common::{ ComposeTest, }, file_io::DataSize, - fio::{Fio, FioJob}, + fio::{FioBuilder, FioJobBuilder}, nexus::{test_fio_to_nexus, NexusBuilder}, pool::PoolBuilder, reactor_poll, @@ -261,12 +261,15 @@ async fn nexus_child_retire_persist_unresponsive_with_fio() { async move { test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_bs(4096) - .with_iodepth(8) - .with_size(DataSize::from_mb(FIO_DATA_SIZE)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_bs(4096) + .with_iodepth(8) + .with_size(DataSize::from_mb(FIO_DATA_SIZE)) + .build(), + ) + .build(), ) .await .unwrap(); diff --git a/io-engine/tests/nexus_children_add_remove.rs b/io-engine/tests/nexus_children_add_remove.rs index 26561be60..7a3b1efbd 100644 --- a/io-engine/tests/nexus_children_add_remove.rs +++ b/io-engine/tests/nexus_children_add_remove.rs @@ -22,7 +22,7 @@ pub mod common; use common::{ compose::{rpc::v1::GrpcConnect, Binary, Builder}, - fio::{Fio, FioJob}, + fio::{FioBuilder, FioJobBuilder}, nexus::{test_fio_to_nexus, NexusBuilder}, pool::PoolBuilder, replica::ReplicaBuilder, @@ -283,12 +283,15 @@ async fn nexus_remove_child_with_io() { async move { test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_runtime(10) - .with_bs(4096) - .with_iodepth(16), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_runtime(10) + .with_bs(4096) + .with_iodepth(16) + .build(), + ) + .build(), ) .await .unwrap(); diff --git a/io-engine/tests/nexus_crd.rs b/io-engine/tests/nexus_crd.rs index 6228bb847..b037ef468 100644 --- a/io-engine/tests/nexus_crd.rs +++ b/io-engine/tests/nexus_crd.rs @@ -19,7 +19,7 @@ use common::{ Builder, }, file_io::DataSize, - fio::{spawn_fio_task, Fio, FioJob, FioJobResult}, + fio::{spawn_fio_task, FioBuilder, FioJobBuilder, FioJobResult}, nexus::NexusBuilder, nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, nvmf::NvmfLocation, @@ -185,16 +185,17 @@ async fn run_io_task( .to_string(); let jobs = (0 .. cnt).map(|_| { - FioJob::new() + FioJobBuilder::new() .with_direct(true) .with_ioengine("libaio") .with_iodepth(128) .with_filename(&path) .with_runtime(rt) .with_rw("randwrite") + .build() }); - let fio = Fio::new().with_jobs(jobs); + let fio = FioBuilder::new().with_jobs(jobs).build(); // Notify the nexus management task that connection is complete and I/O // starts. @@ -386,16 +387,19 @@ async fn nexus_crd_resv() { let fio_res = { let (_cg, path) = nex_1.nvmf_location().open().unwrap(); - let fio = Fio::new().with_job( - FioJob::new() - .with_name("j0") - .with_filename_path(path) - .with_ioengine("libaio") - .with_iodepth(1) - .with_direct(true) - .with_rw("write") - .with_size(DataSize::from_kb(4)), - ); + let fio = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_name("j0") + .with_filename(path) + .with_ioengine("libaio") + .with_iodepth(1) + .with_direct(true) + .with_rw("write") + .with_size(DataSize::from_kb(4)) + .build(), + ) + .build(); tokio::spawn(async { fio.run() }).await.unwrap() }; diff --git a/io-engine/tests/nexus_fault_injection.rs b/io-engine/tests/nexus_fault_injection.rs index daf96b718..575a7c302 100644 --- a/io-engine/tests/nexus_fault_injection.rs +++ b/io-engine/tests/nexus_fault_injection.rs @@ -27,7 +27,7 @@ use common::{ ComposeTest, }, file_io::DataSize, - fio::{spawn_fio_task, Fio, FioJob}, + fio::{spawn_fio_task, FioBuilder, FioJobBuilder}, nexus::{test_write_to_nexus, NexusBuilder}, nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, pool::PoolBuilder, @@ -474,25 +474,31 @@ async fn replica_bdev_io_injection() { // With offset of 30 blocks, the job mustn't hit the injected fault, which // is set on block #20. - let fio_ok = Fio::new().with_job( - FioJob::new() - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(1) - .with_filename(&path) - .with_offset(DataSize::from_blocks(30, BLK_SIZE)) - .with_rw("write"), - ); + let fio_ok = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_offset(DataSize::from_blocks(30, BLK_SIZE)) + .with_rw("write") + .build(), + ) + .build(); // With the entire device, the job must hit the injected fault. - let fio_fail = Fio::new().with_job( - FioJob::new() - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(1) - .with_filename(&path) - .with_rw("write"), - ); + let fio_fail = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_rw("write") + .build(), + ) + .build(); spawn_fio_task(&fio_ok) .await diff --git a/io-engine/tests/nexus_fio.rs b/io-engine/tests/nexus_fio.rs index ad19c213e..c4272ba6c 100644 --- a/io-engine/tests/nexus_fio.rs +++ b/io-engine/tests/nexus_fio.rs @@ -3,7 +3,7 @@ pub mod common; use common::{ compose::{rpc::v1::GrpcConnect, Binary, Builder}, file_io::DataSize, - fio::{Fio, FioJob}, + fio::{FioBuilder, FioJobBuilder}, nexus::{test_fio_to_nexus, NexusBuilder}, pool::PoolBuilder, replica::ReplicaBuilder, @@ -97,15 +97,17 @@ async fn nexus_fio_single_remote() { // Run FIO with okay data size. test_fio_to_nexus( &nex_0, - Fio::new() + FioBuilder::new() .with_job( - FioJob::new() + FioJobBuilder::new() .with_runtime(10) .with_bs(4096) .with_iodepth(8) - .with_size(DataSize::from_mb(DATA_SIZE_OK)), + .with_size(DataSize::from_mb(DATA_SIZE_OK)) + .build(), ) - .with_verbose_err(true), + .with_verbose_err(true) + .build(), ) .await .unwrap(); @@ -113,13 +115,16 @@ async fn nexus_fio_single_remote() { // Run FIO with data size exceeding pool capacity. let err = test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_runtime(10) - .with_bs(4096) - .with_iodepth(8) - .with_size(DataSize::from_mb(DATA_SIZE_OVER)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_runtime(10) + .with_bs(4096) + .with_iodepth(8) + .with_size(DataSize::from_mb(DATA_SIZE_OVER)) + .build(), + ) + .build(), ) .await .unwrap_err(); @@ -222,15 +227,17 @@ async fn nexus_fio_mixed() { // Run FIO with okay data size. test_fio_to_nexus( &nex_0, - Fio::new() + FioBuilder::new() .with_job( - FioJob::new() + FioJobBuilder::new() .with_runtime(10) .with_bs(4096) .with_iodepth(8) - .with_size(DataSize::from_mb(DATA_SIZE_OK)), + .with_size(DataSize::from_mb(DATA_SIZE_OK)) + .build(), ) - .with_verbose_err(true), + .with_verbose_err(true) + .build(), ) .await .unwrap(); @@ -240,13 +247,16 @@ async fn nexus_fio_mixed() { // this run must succeed. test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_runtime(10) - .with_bs(4096) - .with_iodepth(8) - .with_size(DataSize::from_mb(DATA_SIZE_OVER)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_runtime(10) + .with_bs(4096) + .with_iodepth(8) + .with_size(DataSize::from_mb(DATA_SIZE_OVER)) + .build(), + ) + .build(), ) .await .unwrap(); diff --git a/io-engine/tests/nexus_rebuild_partial.rs b/io-engine/tests/nexus_rebuild_partial.rs index c2b33ab19..397e1c254 100644 --- a/io-engine/tests/nexus_rebuild_partial.rs +++ b/io-engine/tests/nexus_rebuild_partial.rs @@ -18,7 +18,7 @@ use common::{ #[cfg(feature = "fault-injection")] use io_engine_tests::{ - fio::{Fio, FioJob}, + fio::{FioBuilder, FioJobBuilder}, nexus::test_fio_to_nexus, }; @@ -235,7 +235,7 @@ async fn nexus_partial_rebuild_io_fault() { assert_eq!(children.len(), 2); assert_eq!(children[1].state(), ChildState::Faulted); assert_eq!(children[1].state_reason(), ChildStateReason::IoFailure); - assert_eq!(children[1].has_io_log, true); + assert!(children[1].has_io_log); // Chunk B. test_write_to_nexus( @@ -485,13 +485,16 @@ async fn nexus_partial_rebuild_double_fault() { // Write some data to have something to rebuild. test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_bs(4096) - .with_iodepth(16) - .with_offset(DataSize::from_mb(DATA_A_POS)) - .with_size(DataSize::from_mb(DATA_A_SIZE)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_bs(4096) + .with_iodepth(16) + .with_offset(DataSize::from_mb(DATA_A_POS)) + .with_size(DataSize::from_mb(DATA_A_SIZE)) + .build(), + ) + .build(), ) .await .unwrap(); @@ -522,13 +525,16 @@ async fn nexus_partial_rebuild_double_fault() { async move { test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_bs(4096) - .with_iodepth(16) - .with_offset(DataSize::from_mb(DATA_B_POS)) - .with_size(DataSize::from_mb(DATA_B_SIZE)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_bs(4096) + .with_iodepth(16) + .with_offset(DataSize::from_mb(DATA_B_POS)) + .with_size(DataSize::from_mb(DATA_B_SIZE)) + .build(), + ) + .build(), ) .await .unwrap(); @@ -566,13 +572,16 @@ async fn nexus_partial_rebuild_double_fault() { // Write some data to have something to rebuild. test_fio_to_nexus( &nex_0, - Fio::new().with_job( - FioJob::new() - .with_bs(4096) - .with_iodepth(16) - .with_offset(DataSize::from_mb(DATA_A_POS)) - .with_size(DataSize::from_mb(DATA_A_SIZE)), - ), + FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_bs(4096) + .with_iodepth(16) + .with_offset(DataSize::from_mb(DATA_A_POS)) + .with_size(DataSize::from_mb(DATA_A_SIZE)) + .build(), + ) + .build(), ) .await .unwrap(); @@ -591,17 +600,17 @@ async fn nexus_partial_rebuild_double_fault() { // First rebuild must have been failed, because I/O failed while the job // was running. assert_eq!(hist[0].state(), RebuildJobState::Failed); - assert_eq!(hist[0].is_partial, true); + assert!(hist[0].is_partial); assert!(hist[0].blocks_transferred < hist[0].blocks_total); // 3rd rebuid job must have been a successfully full rebuild. assert_eq!(hist[1].state(), RebuildJobState::Completed); - assert_eq!(hist[1].is_partial, false); + assert!(!hist[1].is_partial); assert_eq!(hist[1].blocks_transferred, hist[1].blocks_total); // 3rd rebuid job must have been a successfully partial rebuild. assert_eq!(hist[2].state(), RebuildJobState::Completed); - assert_eq!(hist[2].is_partial, true); + assert!(hist[2].is_partial); assert!(hist[2].blocks_transferred < hist[2].blocks_total); // First rebuild job must have been prematurely stopped, so the amount of diff --git a/io-engine/tests/nexus_restart.rs b/io-engine/tests/nexus_restart.rs new file mode 100644 index 000000000..2e2a080dd --- /dev/null +++ b/io-engine/tests/nexus_restart.rs @@ -0,0 +1,378 @@ +pub mod common; + +use io_engine_tests::{ + compose::{ + rpc::v1::{GrpcConnect, SharedRpcHandle}, + Binary, + Builder, + ComposeTest, + }, + file_io::DataSize, + fio::{spawn_fio_task, FioBuilder, FioJobBuilder}, + nexus::NexusBuilder, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, + nvmf::NvmfLocation, + pool::PoolBuilder, + replica::ReplicaBuilder, +}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{ + watch::{Receiver, Sender}, + Mutex, +}; + +const ETCD_IP: &str = "10.1.0.2"; +const ETCD_PORT: &str = "2379"; +const ETCD_PORT_2: &str = "2380"; + +const DISK_SIZE: u64 = 12000; +const REPL_SIZE: u64 = 512; +const NEXUS_SIZE: u64 = REPL_SIZE; + +const SLEEP_BEFORE: u64 = 4; +const SLEEP_DOWN: u64 = 5; +const SLEEP_ADD_CHILD: u64 = 1; + +struct NodeConfig { + disk_name: &'static str, + bdev_name: &'static str, + pool_name: &'static str, + pool_uuid: &'static str, + repl_name: &'static str, + repl_uuid: &'static str, +} + +const NODE_CNT: usize = 3; + +const NODE_CONFIG: [NodeConfig; NODE_CNT] = [ + NodeConfig { + disk_name: "/tmp/disk0.img", + bdev_name: "aio:///tmp/disk0.img?blk_size=512", + pool_name: "pool_0", + pool_uuid: "40baf8b5-6256-4f29-b073-61ebf67d9b91", + repl_name: "repl_0", + repl_uuid: "45c23e54-dc86-45f6-b55b-e44d05f154dd", + }, + NodeConfig { + disk_name: "/tmp/disk1.img", + bdev_name: "aio:///tmp/disk1.img?blk_size=512", + pool_name: "pool_1", + pool_uuid: "2d7f2e76-c1a8-478f-a0c7-b96eb4072075", + repl_name: "repl_1", + repl_uuid: "0b30d5e8-c057-463a-96f4-3591d70120f9", + }, + NodeConfig { + disk_name: "/tmp/disk2.img", + bdev_name: "aio:///tmp/disk2.img?blk_size=512", + pool_name: "pool_2", + pool_uuid: "6f9fd854-87e8-4a8a-8a83-92faa6244eea", + repl_name: "repl_2", + repl_uuid: "88cf669f-34dc-4390-9ad8-24f996f372b5", + }, +]; + +const NEXUS_NAME: &str = "nexus_0"; +const NEXUS_UUID: &str = "d22796b7-332b-4b43-b929-079744d3ddab"; + +/// TODO +#[allow(dead_code)] +struct StorageNode { + pool: PoolBuilder, + repl: ReplicaBuilder, +} + +/// Test cluster. +#[allow(dead_code)] +struct TestCluster { + test: Box, + etcd_endpoint: String, + etcd: etcd_client::Client, + nodes: Vec, +} + +type TestClusterRef = Arc>; + +impl TestCluster { + async fn create() -> TestClusterRef { + // Create test backing store. + for i in NODE_CONFIG { + common::delete_file(&[i.disk_name.to_string()]); + common::truncate_file_bytes(i.disk_name, DISK_SIZE * 1024 * 1024); + } + + let etcd_endpoint = format!("http://{ETCD_IP}:{ETCD_PORT}"); + + let test = Box::new( + Builder::new() + .name("io-race") + .network("10.1.0.0/16") + .unwrap() + .add_container_spec( + common::compose::ContainerSpec::from_binary( + "etcd", + Binary::from_path(env!("ETCD_BIN")).with_args(vec![ + "--data-dir", + "/tmp/etcd-data", + "--advertise-client-urls", + &etcd_endpoint, + "--listen-client-urls", + &etcd_endpoint, + ]), + ) + .with_portmap(ETCD_PORT, ETCD_PORT) + .with_portmap(ETCD_PORT_2, ETCD_PORT_2), + ) + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine") + .with_direct_bind(NODE_CONFIG[0].disk_name) + .with_args(vec!["-l", "1,2", "-Fcolor,nodate,host"]), + ) + .add_container_bin( + "ms_1", + Binary::from_dbg("io-engine") + .with_direct_bind(NODE_CONFIG[1].disk_name) + .with_args(vec!["-l", "3,4", "-Fcolor,nodate,host"]), + ) + .add_container_bin( + "ms_2", + Binary::from_dbg("io-engine") + .with_direct_bind(NODE_CONFIG[2].disk_name) + .with_args(vec![ + "-Fcolor,nodate,host", + "-l", + "5,6", + "-p", + &etcd_endpoint, + "--enable-channel-dbg", + ]), + ) + .with_clean(true) + .with_logs(true) + .build() + .await + .unwrap(), + ); + + let etcd = etcd_client::Client::connect([&etcd_endpoint], None) + .await + .unwrap(); + + // Create pools and replicas. + let mut nodes = Vec::new(); + + for (idx, node) in NODE_CONFIG.iter().enumerate() { + let ms = GrpcConnect::new(&test) + .grpc_handle_shared(&format!("ms_{idx}")) + .await + .unwrap(); + + let mut pool = PoolBuilder::new(ms.clone()) + .with_name(node.pool_name) + .with_uuid(node.pool_uuid) + .with_bdev(node.bdev_name); + + let mut repl = ReplicaBuilder::new(ms.clone()) + .with_pool(&pool) + .with_name(node.repl_name) + .with_uuid(node.repl_uuid) + .with_thin(false) + .with_size_mb(REPL_SIZE); + + pool.create().await.unwrap(); + repl.create().await.unwrap(); + repl.share().await.unwrap(); + + nodes.push(StorageNode { + pool, + repl, + }); + } + + Arc::new(Mutex::new(Self { + test, + etcd_endpoint, + etcd, + nodes, + })) + } + + async fn get_ms_nex(&self) -> SharedRpcHandle { + GrpcConnect::new(&self.test) + .grpc_handle_shared("ms_2") + .await + .unwrap() + } + + async fn start_ms_nex(&mut self) { + self.test.start("ms_2").await.unwrap(); + } + + async fn stop_ms_nex(&mut self) { + self.test.kill("ms_2").await.unwrap(); + } + + async fn create_nexus(&mut self) -> NexusBuilder { + let ms_nex = self.get_ms_nex().await; + + let mut nex = NexusBuilder::new(ms_nex) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&self.nodes[0].repl) + .with_replica(&self.nodes[1].repl) + .with_replica(&self.nodes[2].repl); + + nex.create().await.unwrap(); + + nex + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +/// Tests that a nexus can be properly recreated without I/O failures +/// when its io-engine container restarts. +/// +/// 1. Create a nexus with 3 replicas. +/// 2. Start I/O. +/// 3. Restart nexus's io-engine while I/O is running. +/// 4. Recreate the nexus with 1 replica. +/// 5. Add 2 remaining replicas, triggering rebuilds. +/// 6. Verify I/O. +async fn nexus_restart() { + common::composer_init(); + + let cluster = TestCluster::create().await; + + let nex = cluster.lock().await.create_nexus().await; + nex.publish().await.unwrap(); + + // Run tasks in parallel, I/O and nexus management. + let (s, r) = tokio::sync::watch::channel(()); + + let j0 = tokio::spawn({ + let nvmf = nex.nvmf_location(); + async move { run_io_task(s, &nvmf).await } + }); + tokio::pin!(j0); + + let j1 = tokio::spawn({ + let r = r.clone(); + let c = cluster.clone(); + async move { + run_manage_task(r, c).await; + } + }); + tokio::pin!(j1); + + let (r0, r1) = tokio::join!(j0, j1); + r0.unwrap(); + r1.unwrap(); +} + +/// Runs multiple FIO I/O jobs. +async fn run_io_task(s: Sender<()>, nvmf: &NvmfLocation) { + println!("[I/O task] connecting ..."); + let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); + println!("[I/O task] connection okay"); + + let path = find_mayastor_nvme_device_path(&nvmf.serial) + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Notify the nexus management task that connection is complete and I/O + // starts. + s.send(()).unwrap(); + + let fio_write = FioBuilder::new() + .with_job( + FioJobBuilder::default() + .with_filename(path.clone()) + .with_name("benchtest") + .with_numjobs(1) + .with_direct(true) + .with_rw("randwrite") + .with_do_verify(false) + .with_ioengine("libaio") + .with_bs(DataSize::from_kb(4)) + .with_iodepth(16) + .with_verify("crc32") + .build(), + ) + .with_verbose(true) + .with_verbose_err(true) + .build(); + + println!("[I/O task] Starting write FIO"); + spawn_fio_task(&fio_write).await.unwrap(); + println!("[I/O task] Write FIO finished"); + + let fio_verify = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_filename(path.clone()) + .with_name("benchtest") + .with_numjobs(1) + .with_direct(true) + .with_rw("randread") + .with_ioengine("libaio") + .with_bs(DataSize::from_kb(4)) + .with_iodepth(16) + .with_verify("crc32") + .with_verify_fatal(true) + .with_verify_async(2) + .build(), + ) + .with_verbose(true) + .with_verbose_err(true) + .build(); + + println!("[I/O task] Starting verify FIO"); + spawn_fio_task(&fio_verify).await.unwrap(); + println!("[I/O task] Verify FIO finished"); +} + +async fn run_manage_task(mut r: Receiver<()>, cluster: TestClusterRef) { + // Wait until I/O task connects and signals it is ready. + r.changed().await.unwrap(); + println!("[Manage task] I/O started"); + + tokio::time::sleep(Duration::from_secs(SLEEP_BEFORE)).await; + + // Restart io-engine container. + println!("[Manage task] Restarting io-engine container"); + cluster.lock().await.stop_ms_nex().await; + tokio::time::sleep(Duration::from_secs(SLEEP_DOWN)).await; + cluster.lock().await.start_ms_nex().await; + + let ms_nex = cluster.lock().await.get_ms_nex().await; + + // Recreate the nexus with 1 child. + let mut nex = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&cluster.lock().await.nodes[1].repl); + + println!("[Manage task] Recreating the nexus (1 child)"); + nex.create().await.unwrap(); + + println!("[Manage task] Adding 2nd child"); + cluster.lock().await.nodes[2].pool.create().await.unwrap(); + nex.add_replica(&cluster.lock().await.nodes[2].repl, false) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(SLEEP_ADD_CHILD)).await; + + println!("[Manage task] Adding 3nd child"); + nex.add_replica(&cluster.lock().await.nodes[0].repl, false) + .await + .unwrap(); + + println!("[Manage task] Publishing the nexus"); + nex.publish().await.unwrap(); + println!("[Manage task] Nexus published"); +} diff --git a/io-engine/tests/replica_crd.rs b/io-engine/tests/replica_crd.rs index d9e750967..c0d8af0e6 100644 --- a/io-engine/tests/replica_crd.rs +++ b/io-engine/tests/replica_crd.rs @@ -7,7 +7,7 @@ use std::time::Duration; use common::{ compose::{rpc::v1::GrpcConnect, Binary, Builder}, - fio::{Fio, FioJob, FioJobResult}, + fio::{FioBuilder, FioJobBuilder, FioJobResult}, pool::PoolBuilder, replica::ReplicaBuilder, test::add_fault_injection, @@ -90,15 +90,18 @@ async fn replica_no_fail_crd() { let (_cg, path) = repl_0.nvmf_location().open().unwrap(); // FIO jobs. - let fio = Fio::new().with_job( - FioJob::new() - .with_name("job0") - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(1) - .with_filename_path(&path) - .with_rw("write"), - ); + let fio = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_name("job0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_rw("write") + .build(), + ) + .build(); let fio_res = tokio::spawn(async { fio.run() }).await.unwrap(); let job_res = fio_res.find_job("job0").unwrap(); diff --git a/io-engine/tests/replica_thin_no_space.rs b/io-engine/tests/replica_thin_no_space.rs index d4a7ae8c0..c9d98c478 100644 --- a/io-engine/tests/replica_thin_no_space.rs +++ b/io-engine/tests/replica_thin_no_space.rs @@ -6,7 +6,7 @@ use nix::errno::Errno; use common::{ compose::{rpc::v1::GrpcConnect, Binary, Builder}, - fio::{Fio, FioJob, FioJobResult}, + fio::{FioBuilder, FioJobBuilder, FioJobResult}, pool::PoolBuilder, replica::ReplicaBuilder, test::add_fault_injection, @@ -74,15 +74,18 @@ async fn replica_thin_nospc() { let nvmf = repl_0.nvmf_location(); let (_nvmf_conn, path) = nvmf.open().unwrap(); - let fio = Fio::new().with_job( - FioJob::new() - .with_name("j-0") - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(1) - .with_filename_path(&path) - .with_rw("write"), - ); + let fio = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_rw("write") + .build(), + ) + .build(); let res = tokio::spawn(async move { fio.run() }).await.unwrap(); @@ -147,15 +150,18 @@ async fn replica_nospc_inject() { let nvmf = repl_0.nvmf_location(); let (_nvmf_conn, path) = nvmf.open().unwrap(); - let fio = Fio::new().with_job( - FioJob::new() - .with_name("j-0") - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(1) - .with_filename_path(&path) - .with_rw("write"), - ); + let fio = FioBuilder::new() + .with_job( + FioJobBuilder::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_rw("write") + .build(), + ) + .build(); let res = tokio::spawn(async move { fio.run() }).await.unwrap(); diff --git a/scripts/rust-linter.sh b/scripts/rust-linter.sh index 42dff9ec9..f41a65846 100755 --- a/scripts/rust-linter.sh +++ b/scripts/rust-linter.sh @@ -9,4 +9,5 @@ $CARGO clippy --all --all-targets -- -D warnings \ -A clippy::option_env_unwrap \ -A clippy::redundant-guards \ -A clippy::suspicious-doc-comments \ + -A clippy::useless-format \ -A deprecated diff --git a/utils/dependencies b/utils/dependencies index 86be28510..b32861f36 160000 --- a/utils/dependencies +++ b/utils/dependencies @@ -1 +1 @@ -Subproject commit 86be28510cc4b08a62c5679c5a96c559ee12c09c +Subproject commit b32861f369b56d30eb1b1706b9186592ccb29cfc