From 558fbd529a93932f6e493c55f7d123a33045e96a Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 21 Feb 2024 18:22:22 +0000 Subject: [PATCH] refactor(rebuild): use new rebuild rangers Make use of the rebuild rangers to configure rebuild types. This allows us to remove the setting of the rebuild map being done after the rebuild job is created for the nexus and removing it from the shared rebuild descriptor. The nexus still uses the partial but sequential rebuild to reduce the scope of changes. Once the fully partial rebuild is validated we can switch the nexus to it. Signed-off-by: Tiago Castro --- .../src/bdev/nexus/nexus_bdev_rebuild.rs | 14 +- io-engine/src/rebuild/bdev_rebuild.rs | 68 +++-- io-engine/src/rebuild/mod.rs | 2 +- io-engine/src/rebuild/nexus_rebuild.rs | 266 ++++++++++++------ io-engine/src/rebuild/rebuild_descriptor.rs | 25 +- io-engine/src/rebuild/rebuild_job.rs | 37 +-- io-engine/src/rebuild/rebuild_job_backend.rs | 101 ++++--- io-engine/src/rebuild/rebuilders.rs | 2 - io-engine/tests/nexus_rebuild.rs | 2 +- 9 files changed, 298 insertions(+), 219 deletions(-) diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index 44e3131938..e4e66b3331 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -19,6 +19,7 @@ use crate::{ rebuild::{ HistoryRecord, NexusRebuildJob, + NexusRebuildJobStarter, RebuildError, RebuildJobOptions, RebuildState, @@ -119,7 +120,8 @@ impl<'n> Nexus<'n> { }?; // Create a rebuild job for the child. - self.create_rebuild_job(&src_child_uri, &dst_child_uri) + let starter = self + .create_rebuild_job(&src_child_uri, &dst_child_uri) .await?; self.event( @@ -146,8 +148,8 @@ impl<'n> Nexus<'n> { .lookup_child(&dst_child_uri) .and_then(|c| c.stop_io_log()); - self.rebuild_job_mut(&dst_child_uri)? - .start(map) + starter + .start(self.rebuild_job_mut(&dst_child_uri)?, map) .await .context(nexus_err::RebuildOperation { job: child_uri.to_owned(), @@ -160,7 +162,7 @@ impl<'n> Nexus<'n> { &self, src_child_uri: &str, dst_child_uri: &str, - ) -> Result<(), Error> { + ) -> Result { let verify_mode = match std::env::var("NEXUS_REBUILD_VERIFY") .unwrap_or_default() .as_str() @@ -186,7 +188,7 @@ impl<'n> Nexus<'n> { verify_mode, }; - NexusRebuildJob::new( + NexusRebuildJob::new_starter( &self.name, src_child_uri, dst_child_uri, @@ -202,7 +204,7 @@ impl<'n> Nexus<'n> { }, ) .await - .and_then(NexusRebuildJob::store) + .and_then(NexusRebuildJobStarter::store) .context(nexus_err::CreateRebuild { child: dst_child_uri.to_owned(), name: self.name.clone(), diff --git a/io-engine/src/rebuild/bdev_rebuild.rs b/io-engine/src/rebuild/bdev_rebuild.rs index 9bdfe24597..f25590863b 100644 --- a/io-engine/src/rebuild/bdev_rebuild.rs +++ b/io-engine/src/rebuild/bdev_rebuild.rs @@ -1,7 +1,4 @@ -use std::{ - ops::{Deref, Range}, - rc::Rc, -}; +use std::ops::{Deref, Range}; use super::{ rebuild_descriptor::RebuildDescriptor, @@ -13,7 +10,10 @@ use super::{ SEGMENT_TASKS, }; -use crate::gen_rebuild_instances; +use crate::{ + gen_rebuild_instances, + rebuild::rebuilders::{FullRebuild, RangeRebuilder}, +}; /// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads /// from source_hdl and writes into destination_hdl from specified start to end. @@ -59,12 +59,10 @@ gen_rebuild_instances!(BdevRebuildJob); /// A rebuild job which is responsible for rebuilding from /// source to target of the `RebuildDescriptor`. pub(super) struct BdevRebuildJobBackend { - /// The next block to be rebuilt. - next: u64, /// A pool of tasks which perform the actual data rebuild. task_pool: RebuildTasks, /// A generic rebuild descriptor. - descriptor: Rc, + copier: FullRebuild, /// Notification callback with src and dst uri's. notify_fn: fn(&str, &str) -> (), } @@ -72,11 +70,20 @@ pub(super) struct BdevRebuildJobBackend { #[async_trait::async_trait(?Send)] impl RebuildBackend for BdevRebuildJobBackend { fn on_state_change(&mut self) { - (self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri); + let desc = self.common_desc(); + (self.notify_fn)(&desc.src_uri, &desc.dst_uri); } fn common_desc(&self) -> &RebuildDescriptor { - &self.descriptor + self.copier.desc() + } + + fn blocks_remaining(&self) -> u64 { + self.copier.blocks_remaining() + } + + fn is_partial(&self) -> bool { + self.copier.is_partial() } fn task_pool(&self) -> &RebuildTasks { @@ -84,22 +91,18 @@ impl RebuildBackend for BdevRebuildJobBackend { } fn schedule_task_by_id(&mut self, id: usize) -> bool { - if self.next >= self.descriptor.range.end { - false - } else { - let next = std::cmp::min( - self.next + self.descriptor.segment_size_blks, - self.descriptor.range.end, - ); - self.task_pool.schedule_segment_rebuild( - id, - self.next, - self.descriptor.clone(), - ); - self.task_pool.active += 1; - self.next = next; - true - } + self.copier + .next() + .map(|blk| { + self.task_pool.schedule_segment_rebuild( + id, + blk, + self.copier.copier(), + ); + self.task_pool.active += 1; + true + }) + .unwrap_or_default() } async fn await_one_task(&mut self) -> Option { @@ -110,7 +113,7 @@ impl RebuildBackend for BdevRebuildJobBackend { impl std::fmt::Debug for BdevRebuildJobBackend { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BdevRebuildJob") - .field("next", &self.next) + .field("next", &self.copier.peek_next()) .finish() } } @@ -130,15 +133,10 @@ impl BdevRebuildJobBackend { notify_fn: fn(&str, &str) -> (), descriptor: RebuildDescriptor, ) -> Result { - let be = Self { - next: descriptor.range.start, + Ok(Self { task_pool, - descriptor: Rc::new(descriptor), + copier: FullRebuild::new(descriptor), notify_fn, - }; - - info!("{be}: backend created"); - - Ok(be) + }) } } diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index df8b6ea426..6e8ed346a8 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -12,7 +12,7 @@ mod rebuild_task; mod rebuilders; pub use bdev_rebuild::BdevRebuildJob; -pub use nexus_rebuild::NexusRebuildJob; +pub use nexus_rebuild::{NexusRebuildJob, NexusRebuildJobStarter}; use rebuild_descriptor::RebuildDescriptor; pub(crate) use rebuild_error::RebuildError; use rebuild_job::RebuildOperation; diff --git a/io-engine/src/rebuild/nexus_rebuild.rs b/io-engine/src/rebuild/nexus_rebuild.rs index 75e8d48d89..ab677c9e0e 100644 --- a/io-engine/src/rebuild/nexus_rebuild.rs +++ b/io-engine/src/rebuild/nexus_rebuild.rs @@ -1,16 +1,23 @@ +use futures::channel::oneshot; use snafu::ResultExt; use spdk_rs::LbaRange; -use std::{ - ops::{Deref, Range}, - rc::Rc, -}; +use std::ops::{Deref, Range}; use crate::{ core::{DescriptorGuard, UntypedBdev}, gen_rebuild_instances, rebuild::{ rebuild_error::{RangeLockFailed, RangeUnlockFailed}, + rebuild_job_backend::RebuildJobManager, rebuild_task::{RebuildTask, RebuildTaskCopier}, + rebuilders::{ + FullRebuild, + PartialSeqCopier, + PartialSeqRebuild, + RangeRebuilder, + }, + RebuildMap, + RebuildState, }, }; @@ -30,18 +37,26 @@ use super::{ /// that there is no concurrent between the nexus and the rebuild. /// This is a frontend interface that communicates with a backend runner which /// is the one responsible for the read/writing of the data. -pub struct NexusRebuildJob(RebuildJob); +pub struct NexusRebuildJob { + job: RebuildJob, +} + +pub struct NexusRebuildJobStarter { + job: Option, + manager: RebuildJobManager, + backend: NexusRebuildJobBackendStarter, +} impl std::fmt::Debug for NexusRebuildJob { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) + self.job.fmt(f) } } impl Deref for NexusRebuildJob { type Target = RebuildJob; fn deref(&self) -> &Self::Target { - &self.0 + &self.job } } @@ -55,25 +70,64 @@ impl NexusRebuildJob { /// Builder::new(src, srd).with_range().with_options().with_nexus().build() /// GenericRebuild: /// Builder::new(src, srd).with_range().with_options().build() - pub async fn new( + pub async fn new_starter( nexus_name: &str, src_uri: &str, dst_uri: &str, range: Range, options: RebuildJobOptions, notify_fn: fn(String, String) -> (), - ) -> Result { + ) -> Result { let descriptor = RebuildDescriptor::new(src_uri, dst_uri, Some(range), options) .await?; let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?; - let backend = NexusRebuildJobBackend::new( + let backend = NexusRebuildJobBackendStarter::new( nexus_name, tasks, notify_fn, descriptor, ) .await?; - RebuildJob::from_backend(backend).await.map(Self) + let manager = RebuildJobManager::new(); + + Ok(NexusRebuildJobStarter { + job: Some(Self { + job: RebuildJob::from_manager(&manager, &backend.descriptor), + }), + manager, + backend, + }) + } +} +impl NexusRebuildJobStarter { + pub fn store(mut self) -> Result { + if let Some(job) = self.job.take() { + job.store()?; + } + Ok(self) + } + /// Schedules the job to start in a future and returns a complete channel + /// which can be waited on. + pub async fn start( + self, + job: std::sync::Arc, + map: Option, + ) -> Result, RebuildError> { + match map { + None => { + self.manager + .into_backend(self.backend.into_full()) + .schedule() + .await; + } + Some(map) => { + self.manager + .into_backend(self.backend.into_partial_seq(map)) + .schedule() + .await; + } + } + job.start().await } } @@ -102,29 +156,110 @@ impl Deref for NexusRebuildDescriptor { /// as a means of locking the range which is being rebuilt ensuring /// there are no concurrent writes to the same range between the /// user IO (through the nexus) and the rebuild itself. -pub(super) struct NexusRebuildJobBackend { - /// The next block to be rebuilt. - next: u64, +pub(super) struct NexusRebuildJobBackend< + T: RebuildTaskCopier, + R: RangeRebuilder, +> { + /// A pool of tasks which perform the actual data rebuild. + task_pool: RebuildTasks, + /// The range rebuilder which walks and copies the segments. + copier: R, + /// Notification callback which existing nexus uses to sync + /// with rebuild updates. + notify_fn: fn(String, String) -> (), + /// The name of the nexus this pertains to. + nexus_name: String, + _p: std::marker::PhantomData, +} + +pub(super) struct NexusRebuildJobBackendStarter { /// A pool of tasks which perform the actual data rebuild. task_pool: RebuildTasks, /// A nexus rebuild specific descriptor. - descriptor: Rc, + descriptor: NexusRebuildDescriptor, /// Notification callback which existing nexus uses to sync /// with rebuild updates. notify_fn: fn(String, String) -> (), } +impl NexusRebuildJobBackendStarter { + /// Creates a new RebuildJob which rebuilds from source URI to target URI + /// from start to end (of the data partition); notify_fn callback is called + /// when the rebuild state is updated - with the nexus and destination + /// URI as arguments. + pub async fn new( + nexus_name: &str, + task_pool: RebuildTasks, + notify_fn: fn(String, String) -> (), + descriptor: RebuildDescriptor, + ) -> Result { + let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) + .context(BdevNotFound { + bdev: nexus_name.to_string(), + })?; + + let descriptor = NexusRebuildDescriptor { + nexus: nexus_descriptor, + nexus_name: nexus_name.to_string(), + common: descriptor, + }; + Ok(Self { + descriptor, + task_pool, + notify_fn, + }) + } + + fn into_partial_seq( + self, + map: RebuildMap, + ) -> NexusRebuildJobBackend< + PartialSeqCopier, + PartialSeqRebuild, + > { + NexusRebuildJobBackend { + task_pool: self.task_pool, + notify_fn: self.notify_fn, + nexus_name: self.descriptor.nexus_name.clone(), + copier: PartialSeqRebuild::new(map, self.descriptor), + _p: Default::default(), + } + } + fn into_full( + self, + ) -> NexusRebuildJobBackend< + NexusRebuildDescriptor, + FullRebuild, + > { + NexusRebuildJobBackend { + task_pool: self.task_pool, + notify_fn: self.notify_fn, + nexus_name: self.descriptor.nexus_name.clone(), + copier: FullRebuild::new(self.descriptor), + _p: Default::default(), + } + } +} #[async_trait::async_trait(?Send)] -impl RebuildBackend for NexusRebuildJobBackend { +impl> RebuildBackend + for NexusRebuildJobBackend +{ fn on_state_change(&mut self) { (self.notify_fn)( - self.descriptor.nexus_name.clone(), - self.descriptor.dst_uri.clone(), + self.nexus_name.clone(), + self.common_desc().dst_uri.clone(), ); } fn common_desc(&self) -> &RebuildDescriptor { - &self.descriptor + self.copier.desc() + } + + fn blocks_remaining(&self) -> u64 { + self.copier.blocks_remaining() + } + fn is_partial(&self) -> bool { + self.copier.is_partial() } fn task_pool(&self) -> &RebuildTasks { @@ -132,84 +267,39 @@ impl RebuildBackend for NexusRebuildJobBackend { } fn schedule_task_by_id(&mut self, id: usize) -> bool { - match self.send_segment_task(id) { - Some(next) => { + self.copier + .next() + .map(|blk| { + self.task_pool.schedule_segment_rebuild( + id, + blk, + self.copier.copier(), + ); self.task_pool.active += 1; - self.next = next; true - } - // we've already got enough tasks to rebuild the destination - None => false, - } + }) + .unwrap_or_default() } async fn await_one_task(&mut self) -> Option { self.task_pool.await_one_task().await } } -impl std::fmt::Debug for NexusRebuildJobBackend { +impl> std::fmt::Debug + for NexusRebuildJobBackend +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("NexusRebuildJob") - .field("nexus", &self.descriptor.nexus_name) - .field("next", &self.next) + .field("nexus", &self.nexus_name) + .field("next", &self.copier.peek_next()) .finish() } } -impl std::fmt::Display for NexusRebuildJobBackend { +impl> std::fmt::Display + for NexusRebuildJobBackend +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "on nexus '{nex}'", nex = self.descriptor.nexus_name) - } -} - -impl NexusRebuildJobBackend { - /// Creates a new RebuildJob which rebuilds from source URI to target URI - /// from start to end (of the data partition); notify_fn callback is called - /// when the rebuild state is updated - with the nexus and destination - /// URI as arguments. - pub async fn new( - nexus_name: &str, - task_pool: RebuildTasks, - notify_fn: fn(String, String) -> (), - descriptor: RebuildDescriptor, - ) -> Result { - let nexus_descriptor = UntypedBdev::open_by_name(nexus_name, false) - .context(BdevNotFound { - bdev: nexus_name.to_string(), - })?; - - let be = Self { - next: descriptor.range.start, - task_pool, - descriptor: Rc::new(NexusRebuildDescriptor { - nexus: nexus_descriptor, - nexus_name: nexus_name.to_string(), - common: descriptor, - }), - notify_fn, - }; - - info!("{be}: backend created"); - - Ok(be) - } - - /// Sends one segment worth of data in a reactor future and notifies the - /// management channel. Returns the next segment offset to rebuild, if any. - fn send_segment_task(&mut self, id: usize) -> Option { - if self.next >= self.descriptor.range.end { - None - } else { - let next = std::cmp::min( - self.next + self.descriptor.segment_size_blks, - self.descriptor.range.end, - ); - self.task_pool.schedule_segment_rebuild( - id, - self.next, - self.descriptor.clone(), - ); - Some(next) - } + write!(f, "on nexus '{nex}'", nex = self.nexus_name) } } @@ -232,15 +322,12 @@ impl RebuildTaskCopier for NexusRebuildDescriptor { /// /// The use of RangeContext here is safe because it is stored on the stack /// for the duration of the calls to lock and unlock. + #[inline] async fn copy_segment( &self, blk: u64, task: &mut RebuildTask, ) -> Result { - if self.is_blk_sync(blk) { - return Ok(false); - } - let len = self.get_segment_size_blks(blk); // The nexus children have metadata and data partitions, whereas the // nexus has a data partition only. Because we are locking the range on @@ -273,11 +360,6 @@ impl RebuildTaskCopier for NexusRebuildDescriptor { len, })?; - // In the case of success, mark the segment as already transferred. - if result.is_ok() { - self.blk_synced(blk); - } - result } } diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index ebd031998a..d4c65ae224 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -6,7 +6,6 @@ use spdk_rs::{ IoVec, NvmeStatus, }; -use std::sync::Arc; use crate::{ bdev::device_open, @@ -26,7 +25,7 @@ use crate::{ }, }; -use super::{RebuildError, RebuildJobOptions, RebuildMap, RebuildVerifyMode}; +use super::{RebuildError, RebuildJobOptions, RebuildVerifyMode}; /// Contains all descriptors and their associated information which allows the /// tasks to copy/rebuild data from source to destination. @@ -53,8 +52,6 @@ pub(super) struct RebuildDescriptor { pub(super) dst_descriptor: Box, /// Start time of this rebuild. pub(super) start_time: DateTime, - /// Rebuild map. - pub(super) rebuild_map: Arc>>, } impl RebuildDescriptor { @@ -127,7 +124,6 @@ impl RebuildDescriptor { src_descriptor, dst_descriptor, start_time: Utc::now(), - rebuild_map: Arc::new(parking_lot::Mutex::new(None)), }) } @@ -196,25 +192,6 @@ impl RebuildDescriptor { }) } - /// Checks if the block has to be transferred. - /// If no rebuild map is present, all blocks are considered unsynced. - #[inline(always)] - pub(super) fn is_blk_sync(&self, blk: u64) -> bool { - self.rebuild_map - .lock() - .as_ref() - .map_or(false, |m| m.is_blk_clean(blk)) - } - - /// Marks the rebuild segment starting from the given logical block as - /// already transferred. - #[inline(always)] - pub(super) fn blk_synced(&self, blk: u64) { - if let Some(map) = self.rebuild_map.lock().as_mut() { - map.blk_clean(blk); - } - } - /// Returns `IoVec` for the givem `DmaBuf`, with length adjusted to the copy /// size for the given offset. Given `DmaBuf` must be large enough. #[inline(always)] diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index c59edbff52..a8ffdf4e60 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -8,14 +8,16 @@ use super::{ RebuildError, RebuildJobBackendManager, RebuildJobRequest, - RebuildMap, RebuildState, RebuildStates, RebuildStats, }; use crate::{ core::{Reactors, VerboseError}, - rebuild::rebuild_job_backend::RebuildBackend, + rebuild::{ + rebuild_descriptor::RebuildDescriptor, + rebuild_job_backend::{RebuildBackend, RebuildJobManager}, + }, }; /// Rebuild I/O verification mode. @@ -109,26 +111,27 @@ impl RebuildJob { Ok(frontend) } + /// Creates a new RebuildJob taking a specific backend implementation and + /// running the generic backend manager. + pub(super) fn from_manager( + manager: &RebuildJobManager, + desc: &RebuildDescriptor, + ) -> Self { + Self { + src_uri: desc.src_uri.to_string(), + dst_uri: desc.dst_uri.to_string(), + states: manager.states.clone(), + comms: RebuildFBendChan::from(&manager.info_chan), + complete_chan: Arc::downgrade(&manager.complete_chan), + notify_chan: manager.notify_chan.1.clone(), + } + } + /// Schedules the job to start in a future and returns a complete channel /// which can be waited on. pub async fn start( &self, - map: Option, ) -> Result, RebuildError> { - if let Some(map) = map { - let (s, r) = oneshot::channel(); - self.comms - .send(RebuildJobRequest::SetRebuildMap((map, s))) - .await - .ok(); - if let Err(e) = r.await { - error!( - "{uri}: failed to set rebuild map: {e}", - uri = self.dst_uri - ); - } - } - self.exec_client_op(RebuildOperation::Start)?; self.add_completion_listener() } diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 63a4271846..87973c5c0c 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -1,4 +1,10 @@ -use std::sync::Arc; +use std::{ + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::{channel::oneshot, FutureExt, StreamExt}; @@ -6,7 +12,6 @@ use futures::{channel::oneshot, FutureExt, StreamExt}; use super::{ RebuildDescriptor, RebuildError, - RebuildMap, RebuildState, RebuildStates, RebuildStats, @@ -23,8 +28,6 @@ pub(super) enum RebuildJobRequest { WakeUp, /// Get the rebuild stats from the backend. GetStats(oneshot::Sender), - /// Set rebuild map for this job. - SetRebuildMap((RebuildMap, oneshot::Sender<()>)), } /// Channel to share information between frontend and backend. @@ -72,6 +75,11 @@ pub(super) trait RebuildBackend: /// Get a reference to the common rebuild descriptor. fn common_desc(&self) -> &RebuildDescriptor; + /// Get the remaining blocks we have yet to be rebuilt. + fn blocks_remaining(&self) -> u64; + /// Check if this is a partial rebuild. + fn is_partial(&self) -> bool; + /// Get a reference to the tasks pool. fn task_pool(&self) -> &RebuildTasks; /// Schedule new work on the given task by its id. @@ -85,7 +93,7 @@ pub(super) trait RebuildBackend: /// A rebuild job is responsible for managing a rebuild (copy) which reads /// from source_hdl and writes into destination_hdl from specified start to end. -pub(super) struct RebuildJobBackendManager { +pub(super) struct RebuildJobManager { /// Channel used to signal rebuild update. pub notify_chan: (Sender, Receiver), /// Current state of the rebuild job. @@ -97,11 +105,30 @@ pub(super) struct RebuildJobBackendManager { pub(super) info_chan: RebuildFBendChan, /// Job serial number. serial: u64, +} + +/// A rebuild job is responsible for managing a rebuild (copy) which reads +/// from source_hdl and writes into destination_hdl from specified start to end. +pub(super) struct RebuildJobBackendManager { + manager: RebuildJobManager, /// The rebuild backend runner which implements the `RebuildBackend` and /// performs a specific type of rebuild copy. backend: Box, } +impl Deref for RebuildJobBackendManager { + type Target = RebuildJobManager; + + fn deref(&self) -> &Self::Target { + &self.manager + } +} +impl DerefMut for RebuildJobBackendManager { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.manager + } +} + impl std::fmt::Debug for RebuildJobBackendManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RebuildJob") @@ -126,6 +153,31 @@ impl std::fmt::Display for RebuildJobBackendManager { } } +impl RebuildJobManager { + pub fn new() -> Self { + // Job serial numbers. + static SERIAL: AtomicU64 = AtomicU64::new(1); + + let serial = SERIAL.fetch_add(1, Ordering::SeqCst); + Self { + notify_chan: unbounded::(), + states: Default::default(), + complete_chan: Default::default(), + info_chan: RebuildFBendChan::new(), + serial, + } + } + pub fn into_backend( + self, + backend: impl RebuildBackend + 'static, + ) -> RebuildJobBackendManager { + RebuildJobBackendManager { + manager: self, + backend: Box::new(backend), + } + } +} + impl RebuildJobBackendManager { /// Creates a new RebuildJob which rebuilds from source URI to target URI /// from start to end (of the data partition); notify_fn callback is called @@ -133,11 +185,7 @@ impl RebuildJobBackendManager { /// URI as arguments. pub fn new(backend: impl RebuildBackend + 'static) -> Self { let be = Self { - notify_chan: unbounded::(), - states: Default::default(), - complete_chan: Default::default(), - info_chan: RebuildFBendChan::new(), - serial: 0, + manager: RebuildJobManager::new(), backend: Box::new(backend), }; info!("{be}: backend created"); @@ -262,20 +310,14 @@ impl RebuildJobBackendManager { blocks_total, ); - let blocks_remaining = descriptor - .rebuild_map - .lock() - .as_ref() - .map_or(blocks_total - blocks_recovered, |log| { - log.count_dirty_blks() - }); + let blocks_remaining = self.backend.blocks_remaining(); let progress = (blocks_recovered * 100) / blocks_total; assert!(progress < 100 || blocks_remaining == 0); RebuildStats { start_time: descriptor.start_time, - is_partial: descriptor.rebuild_map.lock().is_some(), + is_partial: self.backend.is_partial(), blocks_total, blocks_recovered, blocks_transferred, @@ -430,9 +472,6 @@ impl RebuildJobBackendManager { Some(RebuildJobRequest::GetStats(reply)) => { self.reply_stats(reply).await.ok(); } - Some(RebuildJobRequest::SetRebuildMap((map, s))) => { - self.set_rebuild_map(map, s).await.ok(); - } None => { self.fail_with(RebuildError::FrontendGone); return false; @@ -440,26 +479,6 @@ impl RebuildJobBackendManager { } true } - - /// Sets rebuild map for this job. - async fn set_rebuild_map( - &mut self, - map: RebuildMap, - s: oneshot::Sender<()>, - ) -> Result<(), RebuildError> { - { - let mut g = self.backend.common_desc().rebuild_map.lock(); - if g.is_some() { - error!("{self}: rebuild map is already set"); - } else { - *g = Some(map); - debug!("{self}: set rebuild map"); - } - } - - s.send(()).ok(); - Ok(()) - } } impl Drop for RebuildJobBackendManager { diff --git a/io-engine/src/rebuild/rebuilders.rs b/io-engine/src/rebuild/rebuilders.rs index 8b45a655a2..d96b629bb0 100644 --- a/io-engine/src/rebuild/rebuilders.rs +++ b/io-engine/src/rebuild/rebuilders.rs @@ -34,7 +34,6 @@ pub(super) struct FullRebuild { } impl FullRebuild { /// Create a full rebuild with the given copier. - #[allow(dead_code)] pub(super) fn new(copier: T) -> Self { let desc = copier.descriptor(); let range = desc.range.clone(); @@ -130,7 +129,6 @@ pub(super) struct PartialSeqRebuild { impl PartialSeqRebuild { /// Create a partial sequential rebuild with the given copier and segment /// map. - #[allow(dead_code)] pub(super) fn new(map: RebuildMap, copier: T) -> Self { let desc = copier.descriptor(); let range = desc.range.clone(); diff --git a/io-engine/tests/nexus_rebuild.rs b/io-engine/tests/nexus_rebuild.rs index 1e289487a4..30b7ef73a1 100644 --- a/io-engine/tests/nexus_rebuild.rs +++ b/io-engine/tests/nexus_rebuild.rs @@ -346,7 +346,7 @@ async fn rebuild_bdev() { ) .await .unwrap(); - let chan = job.start(None).await.unwrap(); + let chan = job.start().await.unwrap(); let state = chan.await.unwrap(); assert_eq!(state, RebuildState::Completed, "Rebuild should succeed"); })