Skip to content

Commit

Permalink
chore(bors): merge pull request #705
Browse files Browse the repository at this point in the history
705: feat(volume/resize): implement resource resize for volume r=dsharma-dc a=dsharma-dc



Co-authored-by: Diwakar Sharma <diwakar.sharma@datacore.com>
  • Loading branch information
mayastor-bors and dsharma-dc committed Jan 12, 2024
2 parents d28c0ea + 1fdfcef commit 5c91e76
Show file tree
Hide file tree
Showing 24 changed files with 653 additions and 57 deletions.
10 changes: 9 additions & 1 deletion control-plane/agents/src/bin/core/controller/scheduling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod volume_policy;
use crate::controller::scheduling::{
nexus::{GetPersistedNexusChildrenCtx, GetSuitableNodesContext},
resources::{ChildItem, NodeItem, PoolItem, ReplicaItem},
volume::{GetSuitablePoolsContext, VolumeReplicasForNexusCtx},
volume::{GetSuitablePoolsContext, ReplicaResizePoolsContext, VolumeReplicasForNexusCtx},
};
use std::{cmp::Ordering, collections::HashMap, future::Future};
use weighted_scoring::{Criteria, Value, ValueGrading, WeightedScore};
Expand Down Expand Up @@ -319,6 +319,14 @@ impl ReplicaFilters {
item.state().online()
}

/// Should only try to resize online replicas
pub(crate) fn online_for_resize(
_request: &ReplicaResizePoolsContext,
item: &ChildItem,
) -> bool {
item.state().online()
}

/// Should only allow children with corresponding replicas with enough size
pub(crate) fn size(request: &GetPersistedNexusChildrenCtx, item: &ChildItem) -> bool {
match request.vol_spec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use stor_port::types::v0::{
nexus_persistence::{ChildInfo, NexusInfo},
replica::ReplicaSpec,
snapshots::replica::ReplicaSnapshot,
volume::VolumeSpec,
},
transport::{Child, ChildUri, NodeId, PoolId, Replica},
};
Expand Down Expand Up @@ -101,6 +102,38 @@ impl PoolItemLister {
None => vec![],
}
}
/// Get a list of replicas wrapped as ChildItem, for resize.
pub(crate) async fn list_for_resize(registry: &Registry, spec: &VolumeSpec) -> Vec<ChildItem> {
let replicas = registry.specs().volume_replicas(&spec.uuid);
let mut state_replicas = Vec::with_capacity(replicas.len());
for replica in &replicas {
if let Ok(replica) = registry.replica(replica.uuid()).await {
state_replicas.push(replica);
}
}
let pool_wrappers = registry.pool_wrappers().await;

replicas
.iter()
.filter_map(|replica_spec| {
let replica_spec = replica_spec.lock().clone();
let replica_state = state_replicas
.iter()
.find(|state| state.uuid == replica_spec.uuid);

let pool_id = replica_spec.pool.pool_name();
pool_wrappers
.iter()
.find(|p| &p.id == pool_id)
.and_then(|pool| {
replica_state.map(|replica_state| {
ChildItem::new(&replica_spec, replica_state, None, pool, None)
})
})
})
.collect()
}

/// Get a list of pool items to create a snapshot clone on.
/// todo: support multi-replica snapshot and clone.
pub(crate) async fn list_for_clones(
Expand Down
101 changes: 101 additions & 0 deletions control-plane/agents/src/bin/core/controller/scheduling/volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::controller::{
volume_policy::{SimplePolicy, ThickPolicy},
AddReplicaFilters, AddReplicaSorters, ChildSorters, ResourceData, ResourceFilter,
},
wrapper::PoolWrapper,
};
use agents::errors::SvcError;
use std::{collections::HashMap, ops::Deref};
Expand Down Expand Up @@ -95,6 +96,11 @@ impl GetSuitablePoolsContext {
pub fn as_thin(&self) -> bool {
self.spec.as_thin() || self.snap_repl()
}
/// Helper util for overcommit checks.
pub(crate) fn overcommit(&self, allowed_commit_percent: u64, pool: &PoolWrapper) -> bool {
let max_cap_allowed = allowed_commit_percent * pool.capacity;
(self.size + pool.commitment()) * 100 < max_cap_allowed
}
}

impl Deref for GetSuitablePoolsContext {
Expand Down Expand Up @@ -738,3 +744,98 @@ impl ResourceFilter for CloneVolumeSnapshot {
self.data.list
}
}

/// The context to check pool capacity for volume replica resize feasibility.
#[derive(Clone)]
pub(crate) struct ReplicaResizePoolsContext {
registry: Registry,
spec: VolumeSpec,
allocated_bytes: Option<u64>,
required_capacity: u64,
}

impl ReplicaResizePoolsContext {
/// The additional capacity that we need.
pub(crate) fn required_capacity(&self) -> u64 {
self.required_capacity
}

/// Spec for the volume undergoing resize.
pub(crate) fn spec(&self) -> &VolumeSpec {
&self.spec
}

/// Get the currently allocated bytes (per replica).
pub(crate) fn allocated_bytes(&self) -> &Option<u64> {
&self.allocated_bytes
}

/// Helper util for overcommit checks.
pub(crate) fn overcommit(&self, allowed_commit_percent: u64, pool: &PoolWrapper) -> bool {
let max_cap_allowed = allowed_commit_percent * pool.capacity;
(self.required_capacity + pool.commitment()) * 100 < max_cap_allowed
}
}

/// Resize the replicas of a volume.
pub(crate) struct ResizeVolumeReplicas {
data: ResourceData<ReplicaResizePoolsContext, ChildItem>,
}

impl ResizeVolumeReplicas {
async fn builder(registry: &Registry, spec: &VolumeSpec, required_capacity: u64) -> Self {
// Reuse the method from AddVolumeReplica even though name doesn't indicate the exact
// purpose.
let allocated_bytes = AddVolumeReplica::allocated_bytes(registry, spec).await;
Self {
data: ResourceData::new(
ReplicaResizePoolsContext {
registry: registry.clone(),
spec: spec.clone(),
allocated_bytes,
required_capacity,
},
PoolItemLister::list_for_resize(registry, spec).await,
),
}
}

fn with_default_policy(self) -> Self {
match self.data.context.spec.as_thin() {
true => self.with_simple_policy(),
false => self.with_thick_policy(),
}
}
fn with_thick_policy(self) -> Self {
self.policy(ThickPolicy::new())
}
fn with_simple_policy(self) -> Self {
let simple = SimplePolicy::new(&self.data.context().registry);
self.policy(simple)
}

/// Default rules for replica filtering when resizing replicas for a volume.
pub(crate) async fn builder_with_defaults(
registry: &Registry,
spec: &VolumeSpec,
req_capacity: u64,
) -> Self {
Self::builder(registry, spec, req_capacity)
.await
.with_default_policy()
}
}

#[async_trait::async_trait(?Send)]
impl ResourceFilter for ResizeVolumeReplicas {
type Request = ReplicaResizePoolsContext;
type Item = ChildItem;

fn data(&mut self) -> &mut ResourceData<Self::Request, Self::Item> {
&mut self.data
}

fn collect(self) -> Vec<Self::Item> {
self.data.list
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::ResourceFilter;
use super::{volume::ResizeVolumeReplicas, ReplicaFilters, ResourceFilter};
use crate::controller::scheduling::{
volume::{AddVolumeReplica, CloneVolumeSnapshot, SnapshotVolumeReplica},
NodeFilters,
Expand Down Expand Up @@ -59,4 +59,9 @@ impl DefaultBasePolicy {
.filter(pool::PoolBaseFilters::capacity)
.filter(pool::PoolBaseFilters::min_free_space)
}
fn filter_resize(request: ResizeVolumeReplicas) -> ResizeVolumeReplicas {
request
.filter(ReplicaFilters::online_for_resize)
.filter(pool::PoolBaseFilters::min_free_space_repl_resize)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::controller::scheduling::{resources::PoolItem, volume::GetSuitablePoolsContext};
use crate::controller::scheduling::{
resources::{ChildItem, PoolItem},
volume::{GetSuitablePoolsContext, ReplicaResizePoolsContext},
};
use std::collections::HashMap;
use stor_port::types::v0::transport::{PoolStatus, PoolTopology};

Expand All @@ -20,10 +23,19 @@ impl PoolBaseFilters {
allowed_commit_percent: u64,
) -> bool {
match request.as_thin() {
true => {
let max_cap_allowed = allowed_commit_percent * item.pool().capacity;
(request.size + item.pool().commitment()) * 100 < max_cap_allowed
}
true => request.overcommit(allowed_commit_percent, item.pool()),
false => true,
}
}
/// Should only attempt to use pools with capacity bigger than the requested size
/// for replica expand.
pub(crate) fn overcommit_repl_resize(
request: &ReplicaResizePoolsContext,
item: &ChildItem,
allowed_commit_percent: u64,
) -> bool {
match request.spec().as_thin() {
true => request.overcommit(allowed_commit_percent, item.pool()),
false => true,
}
}
Expand All @@ -34,6 +46,17 @@ impl PoolBaseFilters {
false => item.pool.free_space() > request.size,
}
}
/// Return true if the pool has enough capacity to resize the replica by the requested
/// value.
pub(crate) fn min_free_space_repl_resize(
request: &ReplicaResizePoolsContext,
item: &ChildItem,
) -> bool {
match request.spec().as_thin() {
true => item.pool().free_space() > Self::free_space_watermark(),
false => item.pool().free_space() > request.required_capacity(),
}
}
/// Should only attempt to use pools with sufficient free space for a full rebuild.
/// Currently the data-plane fully rebuilds a volume, meaning a thin provisioned volume
/// becomes fully allocated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::{
controller::{
registry::Registry,
scheduling::{
resources::PoolItem,
resources::{ChildItem, PoolItem},
volume::{
AddVolumeReplica, CloneVolumeSnapshot, GetSuitablePoolsContext,
SnapshotVolumeReplica,
ReplicaResizePoolsContext, ResizeVolumeReplicas, SnapshotVolumeReplica,
},
volume_policy::{affinity_group, pool::PoolBaseFilters, DefaultBasePolicy},
ResourceFilter, ResourcePolicy, SortBuilder, SortCriteria,
Expand Down Expand Up @@ -69,6 +69,15 @@ impl ResourcePolicy<CloneVolumeSnapshot> for SimplePolicy {
}
}

#[async_trait::async_trait(?Send)]
impl ResourcePolicy<ResizeVolumeReplicas> for SimplePolicy {
fn apply(self, to: ResizeVolumeReplicas) -> ResizeVolumeReplicas {
DefaultBasePolicy::filter_resize(to)
.filter_param(&self, SimplePolicy::min_free_space_repl_resize)
.filter_param(&self, SimplePolicy::pool_overcommit_repl_resize)
}
}

const TOTAL_REPLICA_COUNT_WEIGHT: Ranged = Ranged::new_const(25);
const FREE_SPACE_WEIGHT: Ranged = Ranged::new_const(40);
const OVER_COMMIT_WEIGHT: Ranged = Ranged::new_const(35);
Expand Down Expand Up @@ -149,6 +158,26 @@ impl SimplePolicy {
}
}

/// Helper to figure out space availability based on pool free, pool available and
/// volume's allocated space.
fn min_free_space_util(&self, free: u64, allocated: &Option<u64>, required: u64) -> bool {
free > match allocated {
Some(bytes) => {
let size = if bytes == &0 {
self.cli_args.volume_commitment_initial * required
} else {
self.cli_args.volume_commitment * required
} / 100;
(bytes + size).min(required)
}
None => {
// We really have no clue for some reason.. should not happen but just in case
// let's be conservative?
(self.no_state_min_free_space_percent * required) / 100
}
}
}

/// Minimum free space is the currently allocated usage plus some percentage of volume size
/// slack.
fn min_free_space(&self, request: &GetSuitablePoolsContext, item: &PoolItem) -> bool {
Expand All @@ -160,26 +189,45 @@ impl SimplePolicy {
(self.cli_args.snapshot_commitment * request.size) / 100
};
}
item.pool.free_space()
> match request.allocated_bytes() {
Some(bytes) => {
let size = if bytes == &0 {
self.cli_args.volume_commitment_initial * request.size
} else {
self.cli_args.volume_commitment * request.size
} / 100;
(bytes + size).min(request.size)
}
None => {
// We really have no clue for some reason.. should not happen but just in case
// let's be conservative?
(self.no_state_min_free_space_percent * request.size) / 100
}
}

self.min_free_space_util(
item.pool.free_space(),
request.allocated_bytes(),
request.size,
)
}

/// Checks during replica resize. Minimum free space is the currently allocated usage plus some
/// percentage of volume size slack.
/// TODO: Combine min_free_space_repl_resize and min_free_space for code reuse using generic
/// types.
fn min_free_space_repl_resize(
&self,
request: &ReplicaResizePoolsContext,
item: &ChildItem,
) -> bool {
if !request.spec().as_thin() {
return item.pool().free_space() > request.required_capacity();
}

self.min_free_space_util(
item.pool().free_space(),
request.allocated_bytes(),
request.required_capacity(),
)
}

fn pool_overcommit(&self, request: &GetSuitablePoolsContext, item: &PoolItem) -> bool {
PoolBaseFilters::overcommit(request, item, self.cli_args.pool_commitment)
}

fn pool_overcommit_repl_resize(
&self,
request: &ReplicaResizePoolsContext,
item: &ChildItem,
) -> bool {
PoolBaseFilters::overcommit_repl_resize(request, item, self.cli_args.pool_commitment)
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::controller::scheduling::{
resources::PoolItem,
volume::{AddVolumeReplica, GetSuitablePoolsContext, SnapshotVolumeReplica},
volume::{
AddVolumeReplica, GetSuitablePoolsContext, ResizeVolumeReplicas, SnapshotVolumeReplica,
},
volume_policy::{affinity_group, pool::PoolBaseFilters, DefaultBasePolicy},
ResourceFilter, ResourcePolicy, SortBuilder, SortCriteria,
};
Expand Down Expand Up @@ -34,6 +36,13 @@ impl ResourcePolicy<SnapshotVolumeReplica> for ThickPolicy {
}
}

#[async_trait::async_trait(?Send)]
impl ResourcePolicy<ResizeVolumeReplicas> for ThickPolicy {
fn apply(self, to: ResizeVolumeReplicas) -> ResizeVolumeReplicas {
DefaultBasePolicy::filter_resize(to)
}
}

impl ThickPolicy {
/// Create a new thick policy.
pub(crate) fn new() -> Self {
Expand Down
Loading

0 comments on commit 5c91e76

Please sign in to comment.