diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index 44e313193..e4e66b333 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -19,6 +19,7 @@ use crate::{ rebuild::{ HistoryRecord, NexusRebuildJob, + NexusRebuildJobStarter, RebuildError, RebuildJobOptions, RebuildState, @@ -119,7 +120,8 @@ impl<'n> Nexus<'n> { }?; // Create a rebuild job for the child. - self.create_rebuild_job(&src_child_uri, &dst_child_uri) + let starter = self + .create_rebuild_job(&src_child_uri, &dst_child_uri) .await?; self.event( @@ -146,8 +148,8 @@ impl<'n> Nexus<'n> { .lookup_child(&dst_child_uri) .and_then(|c| c.stop_io_log()); - self.rebuild_job_mut(&dst_child_uri)? - .start(map) + starter + .start(self.rebuild_job_mut(&dst_child_uri)?, map) .await .context(nexus_err::RebuildOperation { job: child_uri.to_owned(), @@ -160,7 +162,7 @@ impl<'n> Nexus<'n> { &self, src_child_uri: &str, dst_child_uri: &str, - ) -> Result<(), Error> { + ) -> Result { let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY") .unwrap_or_default() .as_str() @@ -186,7 +188,7 @@ impl<'n> Nexus<'n> { verify_mode, }; - NexusRebuildJob::new( + NexusRebuildJob::new_starter( &self.name, src_child_uri, dst_child_uri, @@ -202,7 +204,7 @@ impl<'n> Nexus<'n> { }, ) .await - .and_then(NexusRebuildJob::store) + .and_then(NexusRebuildJobStarter::store) .context(nexus_err::CreateRebuild { child: dst_child_uri.to_owned(), name: self.name.clone(), diff --git a/io-engine/src/core/segment_map.rs b/io-engine/src/core/segment_map.rs index 4120bce39..eba3c4f03 100644 --- a/io-engine/src/core/segment_map.rs +++ b/io-engine/src/core/segment_map.rs @@ -96,4 +96,15 @@ impl SegmentMap { pub(crate) fn count_dirty_blks(&self) -> u64 { self.count_ones() * self.segment_size / self.block_len } + + /// Get the segment size in blocks. + pub(crate) fn segment_size_blks(&self) -> u64 { + self.segment_size / self.block_len + } +} + +impl From for BitVec { + fn from(value: SegmentMap) -> Self { + value.segments + } } diff --git a/io-engine/src/rebuild/bdev_rebuild.rs b/io-engine/src/rebuild/bdev_rebuild.rs index 9bdfe2459..f25590863 100644 --- a/io-engine/src/rebuild/bdev_rebuild.rs +++ b/io-engine/src/rebuild/bdev_rebuild.rs @@ -1,7 +1,4 @@ -use std::{ - ops::{Deref, Range}, - rc::Rc, -}; +use std::ops::{Deref, Range}; use super::{ rebuild_descriptor::RebuildDescriptor, @@ -13,7 +10,10 @@ use super::{ SEGMENT_TASKS, }; -use crate::gen_rebuild_instances; +use crate::{ + gen_rebuild_instances, + rebuild::rebuilders::{FullRebuild, RangeRebuilder}, +}; /// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads /// from source_hdl and writes into destination_hdl from specified start to end. @@ -59,12 +59,10 @@ gen_rebuild_instances!(BdevRebuildJob); /// A rebuild job which is responsible for rebuilding from /// source to target of the `RebuildDescriptor`. pub(super) struct BdevRebuildJobBackend { - /// The next block to be rebuilt. - next: u64, /// A pool of tasks which perform the actual data rebuild. task_pool: RebuildTasks, /// A generic rebuild descriptor. - descriptor: Rc, + copier: FullRebuild, /// Notification callback with src and dst uri's. notify_fn: fn(&str, &str) -> (), } @@ -72,11 +70,20 @@ pub(super) struct BdevRebuildJobBackend { #[async_trait::async_trait(?Send)] impl RebuildBackend for BdevRebuildJobBackend { fn on_state_change(&mut self) { - (self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri); + let desc = self.common_desc(); + (self.notify_fn)(&desc.src_uri, &desc.dst_uri); } fn common_desc(&self) -> &RebuildDescriptor { - &self.descriptor + self.copier.desc() + } + + fn blocks_remaining(&self) -> u64 { + self.copier.blocks_remaining() + } + + fn is_partial(&self) -> bool { + self.copier.is_partial() } fn task_pool(&self) -> &RebuildTasks { @@ -84,22 +91,18 @@ impl RebuildBackend for BdevRebuildJobBackend { } fn schedule_task_by_id(&mut self, id: usize) -> bool { - if self.next >= self.descriptor.range.end { - false - } else { - let next = std::cmp::min( - self.next + self.descriptor.segment_size_blks, - self.descriptor.range.end, - ); - self.task_pool.schedule_segment_rebuild( - id, - self.next, - self.descriptor.clone(), - ); - self.task_pool.active += 1; - self.next = next; - true - } + self.copier + .next() + .map(|blk| { + self.task_pool.schedule_segment_rebuild( + id, + blk, + self.copier.copier(), + ); + self.task_pool.active += 1; + true + }) + .unwrap_or_default() } async fn await_one_task(&mut self) -> Option { @@ -110,7 +113,7 @@ impl RebuildBackend for BdevRebuildJobBackend { impl std::fmt::Debug for BdevRebuildJobBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BdevRebuildJob") - .field("next", &self.next) + .field("next", &self.copier.peek_next()) .finish() } } @@ -130,15 +133,10 @@ impl BdevRebuildJobBackend { notify_fn: fn(&str, &str) -> (), descriptor: RebuildDescriptor, ) -> Result { - let be = Self { - next: descriptor.range.start, + Ok(Self { task_pool, - descriptor: Rc::new(descriptor), + copier: FullRebuild::new(descriptor), notify_fn, - }; - - info!("{be}: backend created"); - - Ok(be) + }) } } diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index 5aff584c4..6e8ed346a 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -9,9 +9,10 @@ mod rebuild_map; mod rebuild_state; mod rebuild_stats; mod rebuild_task; +mod rebuilders; pub use bdev_rebuild::BdevRebuildJob; -pub use nexus_rebuild::NexusRebuildJob; +pub use nexus_rebuild::{NexusRebuildJob, NexusRebuildJobStarter}; use rebuild_descriptor::RebuildDescriptor; pub(crate) use rebuild_error::RebuildError; use rebuild_job::RebuildOperation; diff --git a/io-engine/src/rebuild/nexus_rebuild.rs b/io-engine/src/rebuild/nexus_rebuild.rs index 8bb1f33f2..a86952ffa 100644 --- a/io-engine/src/rebuild/nexus_rebuild.rs +++ b/io-engine/src/rebuild/nexus_rebuild.rs @@ -1,16 +1,23 @@ +use futures::channel::oneshot; use snafu::ResultExt; use spdk_rs::LbaRange; -use std::{ - ops::{Deref, Range}, - rc::Rc, -}; +use std::ops::{Deref, Range}; use crate::{ core::{DescriptorGuard, UntypedBdev}, gen_rebuild_instances, rebuild::{ rebuild_error::{RangeLockFailed, RangeUnlockFailed}, + rebuild_job_backend::RebuildJobManager, rebuild_task::{RebuildTask, RebuildTaskCopier}, + rebuilders::{ + FullRebuild, + PartialSeqCopier, + PartialSeqRebuild, + RangeRebuilder, + }, + RebuildMap, + RebuildState, }, }; @@ -30,18 +37,37 @@ use super::{ /// that there is no concurrent between the nexus and the rebuild. /// This is a frontend interface that communicates with a backend runner which /// is the one responsible for the read/writing of the data. -pub struct NexusRebuildJob(RebuildJob); +pub struct NexusRebuildJob { + job: RebuildJob, +} + +/// Nexus supports both full and partial rebuilds. In case of a partial rebuild +/// we have to provide the nexus rebuild job with a `RebuildMap`. +/// However, this can only be provided after we've created the rebuild as taking +/// the map from the nexus children is an operation which cannot be undone +/// today. +/// Therefore we use a rebuild job starter which creates the initial bits +/// required for the rebuild job and which can be started later with or without +/// the `RebuildMap`. +pub struct NexusRebuildJobStarter { + /// The job itself is optional because it gets taken when we want to store. + /// After the job is taken, we then can schedule either a full or a partial + /// rebuild with the backend. + job: Option, + manager: RebuildJobManager, + backend: NexusRebuildJobBackendStarter, +} impl std::fmt::Debug for NexusRebuildJob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) + self.job.fmt(f) } } impl Deref for NexusRebuildJob { type Target = RebuildJob; fn deref(&self) -> &Self::Target { - &self.0 + &self.job } } @@ -55,25 +81,65 @@ impl NexusRebuildJob { /// Builder::new(src, srd).with_range().with_options().with_nexus().build() /// GenericRebuild: /// Builder::new(src, srd).with_range().with_options().build() - pub async fn new( + pub async fn new_starter( nexus_name: &str, src_uri: &str, dst_uri: &str, range: Range, options: RebuildJobOptions, notify_fn: fn(String, String) -> (), - ) -> Result { + ) -> Result { let descriptor = RebuildDescriptor::new(src_uri, dst_uri, Some(range), options) .await?; let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; - let backend = NexusRebuildJobBackend::new( + let backend = NexusRebuildJobBackendStarter::new( nexus_name, tasks, notify_fn, descriptor, ) .await?; - RebuildJob::from_backend(backend).await.map(Self) + let manager = RebuildJobManager::new(); + + Ok(NexusRebuildJobStarter { + job: Some(Self { + job: RebuildJob::from_manager(&manager, &backend.descriptor), + }), + manager, + backend, + }) + } +} +impl NexusRebuildJobStarter { + /// Store the inner rebuild job in the rebuild job list. + pub fn store(mut self) -> Result { + if let Some(job) = self.job.take() { + job.store()?; + } + Ok(self) + } + /// Schedules the job to start in a future and returns a complete channel + /// which can be waited on. + pub async fn start( + self, + job: std::sync::Arc, + map: Option, + ) -> Result, RebuildError> { + match map { + None => { + self.manager + .into_backend(self.backend.into_full()) + .schedule() + .await; + } + Some(map) => { + self.manager + .into_backend(self.backend.into_partial_seq(map)) + .schedule() + .await; + } + } + job.start().await } } @@ -102,29 +168,111 @@ impl Deref for NexusRebuildDescriptor { /// as a means of locking the range which is being rebuilt ensuring /// there are no concurrent writes to the same range between the /// user IO (through the nexus) and the rebuild itself. -pub(super) struct NexusRebuildJobBackend { - /// The next block to be rebuilt. - next: u64, +pub(super) struct NexusRebuildJobBackend< + T: RebuildTaskCopier, + R: RangeRebuilder, +> { + /// A pool of tasks which perform the actual data rebuild. + task_pool: RebuildTasks, + /// The range rebuilder which walks and copies the segments. + copier: R, + /// Notification callback which existing nexus uses to sync + /// with rebuild updates. + notify_fn: fn(String, String) -> (), + /// The name of the nexus this pertains to. + nexus_name: String, + _p: std::marker::PhantomData, +} + +/// A Nexus rebuild job backend starter. +struct NexusRebuildJobBackendStarter { /// A pool of tasks which perform the actual data rebuild. task_pool: RebuildTasks, /// A nexus rebuild specific descriptor. - descriptor: Rc, + descriptor: NexusRebuildDescriptor, /// Notification callback which existing nexus uses to sync /// with rebuild updates. notify_fn: fn(String, String) -> (), } +impl NexusRebuildJobBackendStarter { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the nexus and destination + /// URI as arguments. + pub async fn new( + nexus_name: &str, + task_pool: RebuildTasks, + notify_fn: fn(String, String) -> (), + descriptor: RebuildDescriptor, + ) -> Result { + let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) + .context(BdevNotFound { + bdev: nexus_name.to_string(), + })?; + + let descriptor = NexusRebuildDescriptor { + nexus: nexus_descriptor, + nexus_name: nexus_name.to_string(), + common: descriptor, + }; + Ok(Self { + descriptor, + task_pool, + notify_fn, + }) + } + + fn into_partial_seq( + self, + map: RebuildMap, + ) -> NexusRebuildJobBackend< + PartialSeqCopier, + PartialSeqRebuild, + > { + NexusRebuildJobBackend { + task_pool: self.task_pool, + notify_fn: self.notify_fn, + nexus_name: self.descriptor.nexus_name.clone(), + copier: PartialSeqRebuild::new(map, self.descriptor), + _p: Default::default(), + } + } + fn into_full( + self, + ) -> NexusRebuildJobBackend< + NexusRebuildDescriptor, + FullRebuild, + > { + NexusRebuildJobBackend { + task_pool: self.task_pool, + notify_fn: self.notify_fn, + nexus_name: self.descriptor.nexus_name.clone(), + copier: FullRebuild::new(self.descriptor), + _p: Default::default(), + } + } +} #[async_trait::async_trait(?Send)] -impl RebuildBackend for NexusRebuildJobBackend { +impl> RebuildBackend + for NexusRebuildJobBackend +{ fn on_state_change(&mut self) { (self.notify_fn)( - self.descriptor.nexus_name.clone(), - self.descriptor.dst_uri.clone(), + self.nexus_name.clone(), + self.common_desc().dst_uri.clone(), ); } fn common_desc(&self) -> &RebuildDescriptor { - &self.descriptor + self.copier.desc() + } + + fn blocks_remaining(&self) -> u64 { + self.copier.blocks_remaining() + } + fn is_partial(&self) -> bool { + self.copier.is_partial() } fn task_pool(&self) -> &RebuildTasks { @@ -132,89 +280,48 @@ impl RebuildBackend for NexusRebuildJobBackend { } fn schedule_task_by_id(&mut self, id: usize) -> bool { - match self.send_segment_task(id) { - Some(next) => { + self.copier + .next() + .map(|blk| { + self.task_pool.schedule_segment_rebuild( + id, + blk, + self.copier.copier(), + ); self.task_pool.active += 1; - self.next = next; true - } - // we've already got enough tasks to rebuild the destination - None => false, - } + }) + .unwrap_or_default() } async fn await_one_task(&mut self) -> Option { self.task_pool.await_one_task().await } } -impl std::fmt::Debug for NexusRebuildJobBackend { +impl> std::fmt::Debug + for NexusRebuildJobBackend +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("NexusRebuildJob") - .field("nexus", &self.descriptor.nexus_name) - .field("next", &self.next) + .field("nexus", &self.nexus_name) + .field("next", &self.copier.peek_next()) .finish() } } -impl std::fmt::Display for NexusRebuildJobBackend { +impl> std::fmt::Display + for NexusRebuildJobBackend +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "on nexus '{nex}'", nex = self.descriptor.nexus_name) - } -} - -impl NexusRebuildJobBackend { - /// Creates a new RebuildJob which rebuilds from source URI to target URI - /// from start to end (of the data partition); notify_fn callback is called - /// when the rebuild state is updated - with the nexus and destination - /// URI as arguments. - pub async fn new( - nexus_name: &str, - task_pool: RebuildTasks, - notify_fn: fn(String, String) -> (), - descriptor: RebuildDescriptor, - ) -> Result { - let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) - .context(BdevNotFound { - bdev: nexus_name.to_string(), - })?; - - let be = Self { - next: descriptor.range.start, - task_pool, - descriptor: Rc::new(NexusRebuildDescriptor { - nexus: nexus_descriptor, - nexus_name: nexus_name.to_string(), - common: descriptor, - }), - notify_fn, - }; - - info!("{be}: backend created"); - - Ok(be) - } - - /// Sends one segment worth of data in a reactor future and notifies the - /// management channel. Returns the next segment offset to rebuild, if any. - fn send_segment_task(&mut self, id: usize) -> Option { - if self.next >= self.descriptor.range.end { - None - } else { - let next = std::cmp::min( - self.next + self.descriptor.segment_size_blks, - self.descriptor.range.end, - ); - self.task_pool.schedule_segment_rebuild( - id, - self.next, - self.descriptor.clone(), - ); - Some(next) - } + write!(f, "on nexus '{nex}'", nex = self.nexus_name) } } #[async_trait::async_trait(?Send)] impl RebuildTaskCopier for NexusRebuildDescriptor { + fn descriptor(&self) -> &RebuildDescriptor { + &self.common + } + /// Copies one segment worth of data from source into destination. During /// this time the LBA range being copied is locked so that there cannot be /// front end I/O to the same LBA range. @@ -228,15 +335,12 @@ impl RebuildTaskCopier for NexusRebuildDescriptor { /// /// The use of RangeContext here is safe because it is stored on the stack /// for the duration of the calls to lock and unlock. + #[inline] async fn copy_segment( &self, blk: u64, task: &mut RebuildTask, ) -> Result { - if self.is_blk_sync(blk) { - return Ok(false); - } - let len = self.get_segment_size_blks(blk); // The nexus children have metadata and data partitions, whereas the // nexus has a data partition only. Because we are locking the range on @@ -269,11 +373,6 @@ impl RebuildTaskCopier for NexusRebuildDescriptor { len, })?; - // In the case of success, mark the segment as already transferred. - if result.is_ok() { - self.blk_synced(blk); - } - result } } diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index ebd031998..d4c65ae22 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -6,7 +6,6 @@ use spdk_rs::{ IoVec, NvmeStatus, }; -use std::sync::Arc; use crate::{ bdev::device_open, @@ -26,7 +25,7 @@ use crate::{ }, }; -use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode}; +use super::{RebuildError, RebuildJobOptions, RebuildVerifyMode}; /// Contains all descriptors and their associated information which allows the /// tasks to copy/rebuild data from source to destination. @@ -53,8 +52,6 @@ pub(super) struct RebuildDescriptor { pub(super) dst_descriptor: Box, /// Start time of this rebuild. pub(super) start_time: DateTime, - /// Rebuild map. - pub(super) rebuild_map: Arc>>, } impl RebuildDescriptor { @@ -127,7 +124,6 @@ impl RebuildDescriptor { src_descriptor, dst_descriptor, start_time: Utc::now(), - rebuild_map: Arc::new(parking_lot::Mutex::new(None)), }) } @@ -196,25 +192,6 @@ impl RebuildDescriptor { }) } - /// Checks if the block has to be transferred. - /// If no rebuild map is present, all blocks are considered unsynced. - #[inline(always)] - pub(super) fn is_blk_sync(&self, blk: u64) -> bool { - self.rebuild_map - .lock() - .as_ref() - .map_or(false, |m| m.is_blk_clean(blk)) - } - - /// Marks the rebuild segment starting from the given logical block as - /// already transferred. - #[inline(always)] - pub(super) fn blk_synced(&self, blk: u64) { - if let Some(map) = self.rebuild_map.lock().as_mut() { - map.blk_clean(blk); - } - } - /// Returns `IoVec` for the givem `DmaBuf`, with length adjusted to the copy /// size for the given offset. Given `DmaBuf` must be large enough. #[inline(always)] diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index c59edbff5..a8ffdf4e6 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -8,14 +8,16 @@ use super::{ RebuildError, RebuildJobBackendManager, RebuildJobRequest, - RebuildMap, RebuildState, RebuildStates, RebuildStats, }; use crate::{ core::{Reactors, VerboseError}, - rebuild::rebuild_job_backend::RebuildBackend, + rebuild::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_job_backend::{RebuildBackend, RebuildJobManager}, + }, }; /// Rebuild I/O verification mode. @@ -109,26 +111,27 @@ impl RebuildJob { Ok(frontend) } + /// Creates a new RebuildJob taking a specific backend implementation and + /// running the generic backend manager. + pub(super) fn from_manager( + manager: &RebuildJobManager, + desc: &RebuildDescriptor, + ) -> Self { + Self { + src_uri: desc.src_uri.to_string(), + dst_uri: desc.dst_uri.to_string(), + states: manager.states.clone(), + comms: RebuildFBendChan::from(&manager.info_chan), + complete_chan: Arc::downgrade(&manager.complete_chan), + notify_chan: manager.notify_chan.1.clone(), + } + } + /// Schedules the job to start in a future and returns a complete channel /// which can be waited on. pub async fn start( &self, - map: Option, ) -> Result, RebuildError> { - if let Some(map) = map { - let (s, r) = oneshot::channel(); - self.comms - .send(RebuildJobRequest::SetRebuildMap((map, s))) - .await - .ok(); - if let Err(e) = r.await { - error!( - "{uri}: failed to set rebuild map: {e}", - uri = self.dst_uri - ); - } - } - self.exec_client_op(RebuildOperation::Start)?; self.add_completion_listener() } diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 63a427184..87973c5c0 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -1,4 +1,10 @@ -use std::sync::Arc; +use std::{ + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::{channel::oneshot, FutureExt, StreamExt}; @@ -6,7 +12,6 @@ use futures::{channel::oneshot, FutureExt, StreamExt}; use super::{ RebuildDescriptor, RebuildError, - RebuildMap, RebuildState, RebuildStates, RebuildStats, @@ -23,8 +28,6 @@ pub(super) enum RebuildJobRequest { WakeUp, /// Get the rebuild stats from the backend. GetStats(oneshot::Sender), - /// Set rebuild map for this job. - SetRebuildMap((RebuildMap, oneshot::Sender<()>)), } /// Channel to share information between frontend and backend. @@ -72,6 +75,11 @@ pub(super) trait RebuildBackend: /// Get a reference to the common rebuild descriptor. fn common_desc(&self) -> &RebuildDescriptor; + /// Get the remaining blocks we have yet to be rebuilt. + fn blocks_remaining(&self) -> u64; + /// Check if this is a partial rebuild. + fn is_partial(&self) -> bool; + /// Get a reference to the tasks pool. fn task_pool(&self) -> &RebuildTasks; /// Schedule new work on the given task by its id. @@ -85,7 +93,7 @@ pub(super) trait RebuildBackend: /// A rebuild job is responsible for managing a rebuild (copy) which reads /// from source_hdl and writes into destination_hdl from specified start to end. -pub(super) struct RebuildJobBackendManager { +pub(super) struct RebuildJobManager { /// Channel used to signal rebuild update. pub notify_chan: (Sender, Receiver), /// Current state of the rebuild job. @@ -97,11 +105,30 @@ pub(super) struct RebuildJobBackendManager { pub(super) info_chan: RebuildFBendChan, /// Job serial number. serial: u64, +} + +/// A rebuild job is responsible for managing a rebuild (copy) which reads +/// from source_hdl and writes into destination_hdl from specified start to end. +pub(super) struct RebuildJobBackendManager { + manager: RebuildJobManager, /// The rebuild backend runner which implements the `RebuildBackend` and /// performs a specific type of rebuild copy. backend: Box, } +impl Deref for RebuildJobBackendManager { + type Target = RebuildJobManager; + + fn deref(&self) -> &Self::Target { + &self.manager + } +} +impl DerefMut for RebuildJobBackendManager { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.manager + } +} + impl std::fmt::Debug for RebuildJobBackendManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RebuildJob") @@ -126,6 +153,31 @@ impl std::fmt::Display for RebuildJobBackendManager { } } +impl RebuildJobManager { + pub fn new() -> Self { + // Job serial numbers. + static SERIAL: AtomicU64 = AtomicU64::new(1); + + let serial = SERIAL.fetch_add(1, Ordering::SeqCst); + Self { + notify_chan: unbounded::(), + states: Default::default(), + complete_chan: Default::default(), + info_chan: RebuildFBendChan::new(), + serial, + } + } + pub fn into_backend( + self, + backend: impl RebuildBackend + 'static, + ) -> RebuildJobBackendManager { + RebuildJobBackendManager { + manager: self, + backend: Box::new(backend), + } + } +} + impl RebuildJobBackendManager { /// Creates a new RebuildJob which rebuilds from source URI to target URI /// from start to end (of the data partition); notify_fn callback is called @@ -133,11 +185,7 @@ impl RebuildJobBackendManager { /// URI as arguments. pub fn new(backend: impl RebuildBackend + 'static) -> Self { let be = Self { - notify_chan: unbounded::(), - states: Default::default(), - complete_chan: Default::default(), - info_chan: RebuildFBendChan::new(), - serial: 0, + manager: RebuildJobManager::new(), backend: Box::new(backend), }; info!("{be}: backend created"); @@ -262,20 +310,14 @@ impl RebuildJobBackendManager { blocks_total, ); - let blocks_remaining = descriptor - .rebuild_map - .lock() - .as_ref() - .map_or(blocks_total - blocks_recovered, |log| { - log.count_dirty_blks() - }); + let blocks_remaining = self.backend.blocks_remaining(); let progress = (blocks_recovered * 100) / blocks_total; assert!(progress < 100 || blocks_remaining == 0); RebuildStats { start_time: descriptor.start_time, - is_partial: descriptor.rebuild_map.lock().is_some(), + is_partial: self.backend.is_partial(), blocks_total, blocks_recovered, blocks_transferred, @@ -430,9 +472,6 @@ impl RebuildJobBackendManager { Some(RebuildJobRequest::GetStats(reply)) => { self.reply_stats(reply).await.ok(); } - Some(RebuildJobRequest::SetRebuildMap((map, s))) => { - self.set_rebuild_map(map, s).await.ok(); - } None => { self.fail_with(RebuildError::FrontendGone); return false; @@ -440,26 +479,6 @@ impl RebuildJobBackendManager { } true } - - /// Sets rebuild map for this job. - async fn set_rebuild_map( - &mut self, - map: RebuildMap, - s: oneshot::Sender<()>, - ) -> Result<(), RebuildError> { - { - let mut g = self.backend.common_desc().rebuild_map.lock(); - if g.is_some() { - error!("{self}: rebuild map is already set"); - } else { - *g = Some(map); - debug!("{self}: set rebuild map"); - } - } - - s.send(()).ok(); - Ok(()) - } } impl Drop for RebuildJobBackendManager { diff --git a/io-engine/src/rebuild/rebuild_map.rs b/io-engine/src/rebuild/rebuild_map.rs index 8f0445392..c9383478a 100644 --- a/io-engine/src/rebuild/rebuild_map.rs +++ b/io-engine/src/rebuild/rebuild_map.rs @@ -1,3 +1,4 @@ +use bit_vec::BitVec; use std::fmt::{Debug, Formatter}; use crate::core::SegmentMap; @@ -61,4 +62,15 @@ impl RebuildMap { pub(crate) fn count_dirty_blks(&self) -> u64 { self.segments.count_dirty_blks() } + + /// Get the rebuild map segment size in blocks. + pub(crate) fn segment_size_blks(&self) -> u64 { + self.segments.segment_size_blks() + } +} + +impl From for BitVec { + fn from(value: RebuildMap) -> Self { + value.segments.into() + } } diff --git a/io-engine/src/rebuild/rebuild_task.rs b/io-engine/src/rebuild/rebuild_task.rs index 0dd3e492a..14c3cdb00 100644 --- a/io-engine/src/rebuild/rebuild_task.rs +++ b/io-engine/src/rebuild/rebuild_task.rs @@ -195,6 +195,7 @@ impl RebuildTasks { /// can be expanded for sub-segment copies. #[async_trait::async_trait(?Send)] pub(super) trait RebuildTaskCopier { + fn descriptor(&self) -> &RebuildDescriptor; /// Copies an entire segment at the given block address, from source to /// target using a `DmaBuf`. async fn copy_segment( @@ -206,25 +207,16 @@ pub(super) trait RebuildTaskCopier { #[async_trait::async_trait(?Send)] impl RebuildTaskCopier for RebuildDescriptor { + fn descriptor(&self) -> &RebuildDescriptor { + self + } + /// Copies one segment worth of data from source into destination. async fn copy_segment( &self, blk: u64, task: &mut RebuildTask, ) -> Result { - // todo: move the map out of the descriptor, into the specific backends. - if self.is_blk_sync(blk) { - return Ok(false); - } - - // Perform the copy. - let result = task.copy_one(blk, self).await; - - // In the case of success, mark the segment as already transferred. - if result.is_ok() { - self.blk_synced(blk); - } - - result + task.copy_one(blk, self).await } } diff --git a/io-engine/src/rebuild/rebuilders.rs b/io-engine/src/rebuild/rebuilders.rs new file mode 100644 index 000000000..259c7ed4c --- /dev/null +++ b/io-engine/src/rebuild/rebuilders.rs @@ -0,0 +1,255 @@ +use crate::rebuild::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_task::{RebuildTask, RebuildTaskCopier}, + RebuildError, + RebuildMap, +}; +use bit_vec::BitVec; +use std::{ops::Range, rc::Rc}; + +/// A rebuild may rebuild a device by walking it differently, for example: +/// 1. full rebuild - walk the entire device range and copy every segment +/// (current nexus full rebuild behaviour). +/// 2. partial rebuild - walk the allocated segments only and copy them. +/// 3. partial seq rebuild - walk the entire device range and copy only +/// allocated segments (current nexus partial rebuild behaviour). +pub(super) trait RangeRebuilder { + /// Fetch the next block to rebuild. + fn next(&mut self) -> Option; + /// Peek the next block to rebuild. + fn peek_next(&self) -> Option; + /// Get the remaining blocks we have yet to be rebuilt. + fn blocks_remaining(&self) -> u64; + /// Check if this is a partial rebuild. + fn is_partial(&self) -> bool; + /// Get the rebuild descriptor reference. + fn desc(&self) -> &RebuildDescriptor; + /// Get the copier which can copy a segment. + fn copier(&self) -> Rc; +} + +/// The range is the full range of the request, in steps of segment size. +pub(super) struct FullRebuild { + range: PeekableIterator>>, + copier: Rc, +} +impl FullRebuild { + /// Create a full rebuild with the given copier. + pub(super) fn new(copier: T) -> Self { + let desc = copier.descriptor(); + let range = desc.range.clone(); + Self { + range: PeekableIterator::new( + range.step_by(desc.segment_size_blks as usize), + ), + copier: Rc::new(copier), + } + } +} +impl RangeRebuilder for FullRebuild { + fn next(&mut self) -> Option { + self.range.next() + } + fn peek_next(&self) -> Option { + self.range.peek().cloned() + } + + fn blocks_remaining(&self) -> u64 { + self.peek_next() + .map(|r| self.desc().range.end.max(r) - r) + .unwrap_or_default() + } + fn is_partial(&self) -> bool { + false + } + + fn desc(&self) -> &RebuildDescriptor { + self.copier.descriptor() + } + fn copier(&self) -> Rc { + self.copier.clone() + } +} + +/// A partial rebuild range which steps through each segment but triggers +/// the copy only if the segment dirty bit is set. +pub(super) struct PartialRebuild { + range: PeekableIterator>, + segment_size_blks: u64, + total_blks: u64, + rebuilt_blks: u64, + copier: Rc, +} +impl PartialRebuild { + /// Create a partial sequential rebuild with the given copier and segment + /// map. + #[allow(dead_code)] + pub(super) fn new(map: RebuildMap, copier: T) -> Self { + let total_blks = map.count_dirty_blks(); + let segment_size_blks = map.segment_size_blks(); + let bit_vec: BitVec = map.into(); + Self { + range: PeekableIterator::new(bit_vec.into_iter().enumerate()), + total_blks, + rebuilt_blks: 0, + segment_size_blks, + copier: Rc::new(copier), + } + } +} +impl RangeRebuilder for PartialRebuild { + fn next(&mut self) -> Option { + for (blk, is_set) in self.range.by_ref() { + if is_set { + self.rebuilt_blks += self.segment_size_blks; + return Some(blk as u64); + } + } + None + } + fn peek_next(&self) -> Option { + // todo: should we add a wrapper to ensure we peek only set bits? + self.range.peek().map(|(blk, _)| *blk as u64) + } + + fn blocks_remaining(&self) -> u64 { + self.total_blks - self.rebuilt_blks + } + fn is_partial(&self) -> bool { + false + } + + fn desc(&self) -> &RebuildDescriptor { + self.copier.descriptor() + } + fn copier(&self) -> Rc { + self.copier.clone() + } +} + +/// The range is the full range of the request, in steps of segment size +/// and a copy is triggered for each segment. +/// However, during the copy itself, clean segments are skipped. +pub(super) struct PartialSeqRebuild { + range: PeekableIterator>>, + copier: Rc>, +} +impl PartialSeqRebuild { + /// Create a partial sequential rebuild with the given copier and segment + /// map. + pub(super) fn new(map: RebuildMap, copier: T) -> Self { + let desc = copier.descriptor(); + let range = desc.range.clone(); + Self { + range: PeekableIterator::new( + range.step_by(desc.segment_size_blks as usize), + ), + copier: Rc::new(PartialSeqCopier::new(map, copier)), + } + } +} +impl RangeRebuilder> + for PartialSeqRebuild +{ + fn next(&mut self) -> Option { + self.range.next() + } + fn peek_next(&self) -> Option { + self.range.peek().cloned() + } + + fn blocks_remaining(&self) -> u64 { + self.copier.map.lock().count_dirty_blks() + } + fn is_partial(&self) -> bool { + true + } + + fn desc(&self) -> &RebuildDescriptor { + self.copier.descriptor() + } + fn copier(&self) -> Rc> { + self.copier.clone() + } +} +/// The partial sequential rebuild copier, which uses a bitmap to determine if a +/// particular block range must be copied. +pub(super) struct PartialSeqCopier { + map: parking_lot::Mutex, + copier: T, +} +impl PartialSeqCopier { + fn new(map: RebuildMap, copier: T) -> Self { + Self { + map: parking_lot::Mutex::new(map), + copier, + } + } + /// Checks if the block has to be transferred. + /// If no rebuild map is present, all blocks are considered unsynced. + #[inline(always)] + fn is_blk_sync(&self, blk: u64) -> bool { + self.map.lock().is_blk_clean(blk) + } + + /// Marks the rebuild segment starting from the given logical block as + /// already transferred. + #[inline(always)] + fn blk_synced(&self, blk: u64) { + self.map.lock().blk_clean(blk); + } +} +#[async_trait::async_trait(?Send)] +impl RebuildTaskCopier for PartialSeqCopier { + fn descriptor(&self) -> &RebuildDescriptor { + self.copier.descriptor() + } + + /// Copies one segment worth of data from source into destination. + async fn copy_segment( + &self, + blk: u64, + task: &mut RebuildTask, + ) -> Result { + if self.is_blk_sync(blk) { + return Ok(false); + } + + let result = self.copier.copy_segment(blk, task).await; + + // In the case of success, mark the segment as already transferred. + if result.is_ok() { + self.blk_synced(blk); + } + + result + } +} + +/// Adds peekable functionality to a generic iterator. +/// > Note: the peekable from the std library is not sufficient here because it +/// > requires a mutable reference to peek. We get around this limitation by +/// > always setting the peek at a small performance cost. +struct PeekableIterator { + iter: I, + peek: Option, +} +impl PeekableIterator { + fn new(mut iter: I) -> Self { + Self { + peek: iter.next(), + iter, + } + } + /// Peek into the future for the next value which next would yield. + fn peek(&self) -> Option<&I::Item> { + self.peek.as_ref() + } +} +impl Iterator for PeekableIterator { + type Item = I::Item; + + fn next(&mut self) -> Option { + std::mem::replace(&mut self.peek, self.iter.next()) + } +} diff --git a/io-engine/tests/nexus_rebuild.rs b/io-engine/tests/nexus_rebuild.rs index 1e289487a..30b7ef73a 100644 --- a/io-engine/tests/nexus_rebuild.rs +++ b/io-engine/tests/nexus_rebuild.rs @@ -346,7 +346,7 @@ async fn rebuild_bdev() { ) .await .unwrap(); - let chan = job.start(None).await.unwrap(); + let chan = job.start().await.unwrap(); let state = chan.await.unwrap(); assert_eq!(state, RebuildState::Completed, "Rebuild should succeed"); }) diff --git a/test/python/requirements.txt b/test/python/requirements.txt index cd0f7d251..02d22f5ed 100644 --- a/test/python/requirements.txt +++ b/test/python/requirements.txt @@ -2,6 +2,7 @@ asyncio==3.4.3 asyncssh==2.14.1 black==22.10.0 protobuf==4.21.8 +pytest==7.4.4 pytest-asyncio==0.21.1 pytest-bdd==6.1.1 pytest-black==0.3.12