Skip to content

Commit

Permalink
Try #1581:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Jan 19, 2024
2 parents 3623158 + 5098cf1 commit e648cf9
Show file tree
Hide file tree
Showing 15 changed files with 1,027 additions and 600 deletions.
6 changes: 3 additions & 3 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use io_engine::{
core::{MayastorEnvironment, Mthread},
logger,
logger::LogFormat,
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

pub mod bdev;
Expand Down Expand Up @@ -457,7 +457,7 @@ pub async fn wait_for_rebuild(
timeout: Duration,
) {
let (s, r) = unbounded::<()>();
let job = match RebuildJob::lookup(&dst_uri) {
let job = match NexusRebuildJob::lookup(&dst_uri) {
Ok(job) => job,
Err(_) => return,
};
Expand Down Expand Up @@ -490,7 +490,7 @@ pub async fn wait_for_rebuild(
error
});
reactor_poll!(r);
if let Ok(job) = RebuildJob::lookup(&dst_uri) {
if let Ok(job) = NexusRebuildJob::lookup(&dst_uri) {
job.stats().await;
}
t.join().unwrap().unwrap();
Expand Down
22 changes: 11 additions & 11 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
eventing::{EventMetaGen, EventWithMeta},
rebuild::{
HistoryRecord,
NexusRebuildJob,
RebuildError,
RebuildJob,
RebuildJobOptions,
RebuildState,
RebuildStats,
Expand Down Expand Up @@ -139,8 +139,8 @@ impl<'n> Nexus<'n> {
self.reconfigure(DrEvent::ChildRebuild).await;

// Stop the I/O log and create a rebuild map from it.
// As this is done after the reconfiguraion, any new write I/Os will
// now reach the destionation child, and no rebuild will be required
// As this is done after the reconfiguration, any new write I/Os will
// now reach the destination child, and no rebuild will be required
// for them.
let map = self
.lookup_child(&dst_child_uri)
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'n> Nexus<'n> {
verify_mode,
};

RebuildJob::new(
NexusRebuildJob::new(
&self.name,
src_child_uri,
dst_child_uri,
Expand All @@ -202,7 +202,7 @@ impl<'n> Nexus<'n> {
},
)
.await
.and_then(RebuildJob::store)
.and_then(NexusRebuildJob::store)
.context(nexus_err::CreateRebuild {
child: dst_child_uri.to_owned(),
name: self.name.clone(),
Expand All @@ -211,7 +211,7 @@ impl<'n> Nexus<'n> {

/// Translates the job into a new history record and pushes into
/// the history.
fn create_history_record(&self, job: Arc<RebuildJob>) {
fn create_history_record(&self, job: Arc<NexusRebuildJob>) {
let Some(rec) = job.history_record() else {
error!("{self:?}: try to get history record on unfinished job");
return;
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<'n> Nexus<'n> {
pub async fn cancel_rebuild_jobs(&self, src_uri: &str) -> Vec<String> {
info!("{:?}: cancel rebuild jobs from '{}'...", self, src_uri);

let src_jobs = RebuildJob::lookup_src(src_uri);
let src_jobs = NexusRebuildJob::lookup_src(src_uri);
let mut terminated_jobs = Vec::new();
let mut rebuilding_children = Vec::new();

Expand Down Expand Up @@ -375,8 +375,8 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job(
&self,
dst_child_uri: &str,
) -> Result<std::sync::Arc<RebuildJob>, Error> {
RebuildJob::lookup(dst_child_uri).map_err(|_| {
) -> Result<std::sync::Arc<NexusRebuildJob>, Error> {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name: self.name.to_owned(),
Expand All @@ -389,9 +389,9 @@ impl<'n> Nexus<'n> {
pub(crate) fn rebuild_job_mut(
&self,
dst_child_uri: &str,
) -> Result<Arc<RebuildJob>, Error> {
) -> Result<Arc<NexusRebuildJob>, Error> {
let name = self.name.clone();
RebuildJob::lookup(dst_child_uri).map_err(|_| {
NexusRebuildJob::lookup(dst_child_uri).map_err(|_| {
Error::RebuildJobNotFound {
child: dst_child_uri.to_owned(),
name,
Expand Down
12 changes: 7 additions & 5 deletions io-engine/src/bdev/nexus/nexus_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
VerboseError,
},
persistent_store::PersistentStore,
rebuild::{RebuildJob, RebuildMap},
rebuild::{NexusRebuildJob, RebuildMap},
};

use crate::{
Expand Down Expand Up @@ -1199,13 +1199,15 @@ impl<'c> NexusChild<'c> {
/// TODO
pub(super) fn remove_rebuild_job(
&self,
) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::remove(&self.name).ok()
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::remove(&self.name).ok()
}

/// Return the rebuild job which is rebuilding this child, if rebuilding.
pub(crate) fn rebuild_job(&self) -> Option<std::sync::Arc<RebuildJob>> {
RebuildJob::lookup(&self.name).ok()
pub(crate) fn rebuild_job(
&self,
) -> Option<std::sync::Arc<NexusRebuildJob>> {
NexusRebuildJob::lookup(&self.name).ok()
}

/// Return the rebuild progress on this child, if rebuilding.
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/eventing/nexus_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::{
bdev::{nexus, nexus::NexusChild},
core::{MayastorEnvironment, VerboseError},
eventing::{Event, EventMetaGen, EventWithMeta},
rebuild::{RebuildJob, RebuildState},
rebuild::{NexusRebuildJob, RebuildState},
};

impl EventMetaGen for RebuildJob {
impl EventMetaGen for NexusRebuildJob {
fn meta(&self) -> EventMeta {
let rebuild_status = match self.state() {
RebuildState::Init | RebuildState::Running => {
Expand Down
6 changes: 3 additions & 3 deletions io-engine/src/grpc/v0/nexus_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
PtplFileOps,
},
core::{Protocol, Share},
rebuild::RebuildJob,
rebuild::NexusRebuildJob,
};

fn map_fault_reason(r: FaultReason) -> ChildStateReason {
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
allowed_hosts: self.allowed_hosts(),
}
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<'n> Nexus<'n> {
}
children
},
rebuilds: RebuildJob::count() as u32,
rebuilds: NexusRebuildJob::count() as u32,
ana_state: ana_state as i32,
allowed_hosts: self.allowed_hosts(),
}
Expand Down
144 changes: 144 additions & 0 deletions io-engine/src/rebuild/bdev_rebuild.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::{
ops::{Deref, Range},
rc::Rc,
};

use super::{
rebuild_descriptor::RebuildDescriptor,
rebuild_error::RebuildError,
rebuild_job_backend::RebuildBackend,
rebuild_task::{RebuildTasks, TaskResult},
RebuildJob,
RebuildJobOptions,
SEGMENT_TASKS,
};

use crate::gen_rebuild_instances;

/// A Bdev rebuild job is responsible for managing a rebuild (copy) which reads
/// from source_hdl and writes into destination_hdl from specified start to end.
pub struct BdevRebuildJob(RebuildJob);

impl std::fmt::Debug for BdevRebuildJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl Deref for BdevRebuildJob {
type Target = RebuildJob;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl BdevRebuildJob {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the nexus and destination
/// URI as arguments.
pub async fn new(
src_uri: &str,
dst_uri: &str,
range: Option<Range<u64>>,
options: RebuildJobOptions,
notify_fn: fn(&str, &str) -> (),
) -> Result<Self, RebuildError> {
let descriptor =
RebuildDescriptor::new(src_uri, dst_uri, range, options).await?;
let tasks = RebuildTasks::new(SEGMENT_TASKS, &descriptor)?;
let backend =
BdevRebuildJobBackend::new(tasks, notify_fn, descriptor).await?;

RebuildJob::from_backend(backend).await.map(Self)
}
}

gen_rebuild_instances!(BdevRebuildJob);

/// A rebuild job which is responsible for rebuilding from
/// source to target of the `RebuildDescriptor`.
pub(super) struct BdevRebuildJobBackend {
/// The next block to be rebuilt.
next: u64,
/// A pool of tasks which perform the actual data rebuild.
task_pool: RebuildTasks,
/// A generic rebuild descriptor.
descriptor: Rc<RebuildDescriptor>,
/// Notification callback with src and dst uri's.
notify_fn: fn(&str, &str) -> (),
}

#[async_trait::async_trait(?Send)]
impl RebuildBackend for BdevRebuildJobBackend {
fn on_state_change(&mut self) {
(self.notify_fn)(&self.descriptor.src_uri, &self.descriptor.dst_uri);
}

fn cmn_descriptor(&self) -> &RebuildDescriptor {
&self.descriptor
}

fn task_pool(&self) -> &RebuildTasks {
&self.task_pool
}

fn schedule_task_by_id(&mut self, id: usize) -> bool {
if self.next >= self.descriptor.range.end {
false
} else {
let next = std::cmp::min(
self.next + self.descriptor.segment_size_blks,
self.descriptor.range.end,
);
self.task_pool.schedule_segment_rebuild(
id,
self.next,
self.descriptor.clone(),
);
self.task_pool.active += 1;
self.next = next;
true
}
}

async fn await_one_task(&mut self) -> Option<TaskResult> {
self.task_pool.await_one_task().await
}
}

impl std::fmt::Debug for BdevRebuildJobBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BdevRebuildJob")
.field("next", &self.next)
.finish()
}
}
impl std::fmt::Display for BdevRebuildJobBackend {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}

impl BdevRebuildJobBackend {
/// Creates a new RebuildJob which rebuilds from source URI to target URI
/// from start to end (of the data partition); notify_fn callback is called
/// when the rebuild state is updated - with the source and destination
/// URI as arguments.
pub async fn new(
task_pool: RebuildTasks,
notify_fn: fn(&str, &str) -> (),
descriptor: RebuildDescriptor,
) -> Result<Self, RebuildError> {
let be = Self {
next: descriptor.range.start,
task_pool,
descriptor: Rc::new(descriptor),
notify_fn,
};

info!("{be}: backend created");

Ok(be)
}
}
15 changes: 10 additions & 5 deletions io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
mod bdev_rebuild;
mod nexus_rebuild;
mod rebuild_descriptor;
mod rebuild_error;
mod rebuild_instances;
mod rebuild_job;
mod rebuild_job_backend;
mod rebuild_map;
mod rebuild_state;
mod rebuild_stats;
mod rebuild_task;

pub use bdev_rebuild::BdevRebuildJob;
pub use nexus_rebuild::NexusRebuildJob;
use rebuild_descriptor::RebuildDescriptor;
pub(crate) use rebuild_error::RebuildError;
use rebuild_job::RebuildOperation;
pub use rebuild_job::{RebuildJob, RebuildJobOptions, RebuildVerifyMode};
use rebuild_job_backend::{
RebuildFBendChan,
RebuildJobBackend,
RebuildJobBackendManager,
RebuildJobRequest,
};
pub(crate) use rebuild_map::RebuildMap;
pub use rebuild_map::RebuildMap;
pub use rebuild_state::RebuildState;
use rebuild_state::RebuildStates;
pub(crate) use rebuild_stats::HistoryRecord;
pub use rebuild_stats::RebuildStats;
use rebuild_task::{RebuildTask, RebuildTasks, TaskResult};
use rebuild_task::{RebuildTasks, TaskResult};

/// Number of concurrent copy tasks per rebuild job
const SEGMENT_TASKS: usize = 16;
Expand All @@ -31,12 +36,12 @@ pub(crate) const SEGMENT_SIZE: u64 =
spdk_rs::libspdk::SPDK_BDEV_LARGE_BUF_MAX_SIZE as u64;

/// Checks whether a range is contained within another range
trait Within<T> {
trait WithinRange<T> {
/// True if `self` is contained within `right`, otherwise false
fn within(&self, right: std::ops::Range<T>) -> bool;
}

impl Within<u64> for std::ops::Range<u64> {
impl WithinRange<u64> for std::ops::Range<u64> {
fn within(&self, right: std::ops::Range<u64>) -> bool {
// also make sure ranges don't overflow
self.start < self.end
Expand Down
Loading

0 comments on commit e648cf9

Please sign in to comment.