Skip to content

Commit

Permalink
Try #737:
Browse files Browse the repository at this point in the history
  • Loading branch information
mayastor-bors committed Feb 12, 2024
2 parents 7a7d1ce + b57f549 commit e6b65a0
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 31 deletions.
12 changes: 10 additions & 2 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) struct RegistryInner<S: Store> {
thin_args: ThinArgs,
/// Check if the HA feature is enabled.
ha_disabled: bool,
/// Etcd max page size.
etcd_max_page_size: i64,
}

impl Registry {
Expand All @@ -121,6 +123,7 @@ impl Registry {
host_acl: Vec<HostAccessControl>,
thin_args: ThinArgs,
ha_enabled: bool,
etcd_max_page_size: i64,
) -> Result<Self, SvcError> {
let store_endpoint = Self::format_store_endpoint(&store_url);
tracing::info!("Connecting to persistent store at {}", store_endpoint);
Expand Down Expand Up @@ -171,14 +174,15 @@ impl Registry {
legacy_prefix_present,
thin_args,
ha_disabled: ha_enabled,
etcd_max_page_size,
}),
};
registry.init().await?;

// Disable v1 compat if nexus_info keys are migrated.
if registry.config().mayastor_compat_v1() && registry.nexus_info_v1_migrated().await? {
// Delete the v1 nexus_info keys by brute force.
delete_all_v1_nexus_info(&mut store)
delete_all_v1_nexus_info(&mut store, etcd_max_page_size)
.await
.map_err(|error| StoreError::Generic {
source: Box::new(error),
Expand Down Expand Up @@ -406,7 +410,11 @@ impl Registry {
async fn init(&self) -> Result<(), SvcError> {
let mut store = self.store.lock().await;
self.specs
.init(store.deref_mut(), self.legacy_prefix_present)
.init(
store.deref_mut(),
self.legacy_prefix_present,
self.etcd_max_page_size,
)
.await?;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ pub const MSP_OPERATOR: &str = "msp-operator";
pub(crate) async fn migrate_product_v1_to_v2<S: Store>(
store: &mut S,
spec_type: StorableObjectType,
etcd_max_page_size: i64,
) -> Result<(), StoreError> {
info!("Migrating {spec_type:?} from v1 to v2 key space");
let prefix = &product_v1_key_prefix_obj(spec_type);
let store_entries = store.get_values_prefix(prefix).await?;
let store_entries = store
.get_values_paged_all(prefix, etcd_max_page_size)
.await?;
for (k, v) in store_entries {
let id = k
.split('/')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::controller::{
};
use agents::errors::SvcError;
use stor_port::{
transport_api::ResourceKind,
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::{ErrorChain, ResourceKind},
types::v0::{
openapi::apis::Uuid,
store::{
Expand All @@ -18,10 +19,11 @@ use stor_port::{
node::NodeSpec,
pool::PoolSpec,
replica::ReplicaSpec,
volume::{AffinityGroupSpec, VolumeSpec},
snapshots::volume::VolumeSnapshot,
volume::{AffinityGroupSpec, VolumeContentSource, VolumeSpec},
AsOperationSequencer, OperationMode, OperationSequence, SpecStatus, SpecTransaction,
},
transport::{NexusId, NodeId, PoolId, ReplicaId, VolumeId},
transport::{NexusId, NodeId, PoolId, ReplicaId, SnapshotId, VolumeId},
},
};

Expand All @@ -30,14 +32,6 @@ use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use snafu::{ResultExt, Snafu};
use std::{fmt::Debug, ops::Deref, sync::Arc};
use stor_port::{
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::ErrorChain,
types::v0::{
store::{snapshots::volume::VolumeSnapshot, volume::VolumeContentSource},
transport::SnapshotId,
},
};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
Expand Down Expand Up @@ -909,6 +903,7 @@ impl ResourceSpecsLocked {
&self,
store: &mut S,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SvcError> {
let spec_types = [
StorableObjectType::VolumeSpec,
Expand All @@ -919,7 +914,7 @@ impl ResourceSpecsLocked {
StorableObjectType::VolumeSnapshot,
];
for spec in &spec_types {
self.populate_specs(store, *spec, legacy_prefix_present)
self.populate_specs(store, *spec, legacy_prefix_present, etcd_max_page_size)
.await
.map_err(|error| SvcError::Internal {
details: error.full_string(),
Expand Down Expand Up @@ -996,22 +991,22 @@ impl ResourceSpecsLocked {
store: &mut S,
spec_type: StorableObjectType,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SpecError> {
if legacy_prefix_present {
migrate_product_v1_to_v2(store, spec_type)
migrate_product_v1_to_v2(store, spec_type, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreMigrate {
source: Box::new(e),
})?;
}
let prefix = key_prefix_obj(spec_type, API_VERSION);
let store_entries =
store
.get_values_prefix(&prefix)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_entries = store
.get_values_paged_all(&prefix, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_values = store_entries.iter().map(|e| e.1.clone()).collect();

let mut resource_specs = self.0.write();
Expand Down
7 changes: 6 additions & 1 deletion control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod watch;
use clap::Parser;
use controller::registry::NumRebuilds;
use std::{net::SocketAddr, num::ParseIntError};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR, ETCD_MAX_PAGE_LIMIT};

use stor_port::HostAccessControl;
use utils::tracing_telemetry::{trace::TracerProvider, KeyValue};
Expand Down Expand Up @@ -108,6 +108,10 @@ pub(crate) struct CliArgs {
/// This is useful when the frontend nodes do not support the NVMe ANA feature.
#[clap(long, env = "HA_DISABLED")]
pub(crate) disable_ha: bool,

/// Etcd Pagination Limit.
#[clap(long, default_value = ETCD_MAX_PAGE_LIMIT)]
pub(crate) etcd_page_limit: u32,
}
impl CliArgs {
fn args() -> Self {
Expand Down Expand Up @@ -178,6 +182,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
},
cli_args.thin_args,
cli_args.disable_ha,
cli_args.etcd_page_limit as i64,
)
.await?;

Expand Down
10 changes: 5 additions & 5 deletions control-plane/stor-port/src/types/v0/store/nexus_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use std::fmt::Debug;
use tracing::info;
use uuid::Uuid;

/// ETCD Pagination limit.
const ETCD_PAGED_LIMIT: i64 = 1000;

/// Definition of the nexus information that gets saved in the persistent
/// store.
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -139,12 +136,15 @@ impl StorableObject for NexusInfo {

/// Deletes all v1 nexus_info by fetching all keys and parsing the key to UUID and deletes on
/// success.
pub async fn delete_all_v1_nexus_info<S: Store>(store: &mut S) -> Result<(), StoreError> {
pub async fn delete_all_v1_nexus_info<S: Store>(
store: &mut S,
etcd_page_limit: i64,
) -> Result<(), StoreError> {
let mut prefix: &str = "";
let mut first = true;
let mut kvs;
loop {
kvs = store.get_values_paged(prefix, ETCD_PAGED_LIMIT).await?;
kvs = store.get_values_paged(prefix, etcd_page_limit).await?;
if !first && kvs.get(0).is_some() {
kvs.remove(0);
}
Expand Down
7 changes: 7 additions & 0 deletions utils/pstor/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub trait StoreKv: Sync + Send + Clone {
key_prefix: &str,
limit: i64,
) -> Result<Vec<(String, Value)>, Error>;
/// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all
/// the values for the prefix with limit.
async fn get_values_paged_all(
&mut self,
key_prefix: &str,
limit: i64,
) -> Result<Vec<(String, Value)>, Error>;
/// Deletes all key values from a given prefix.
async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error>;
}
Expand Down
32 changes: 31 additions & 1 deletion utils/pstor/src/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ impl StoreKv for Etcd {
Some(
GetOptions::new()
.with_prefix()
.with_from_key()
.with_sort(SortTarget::Key, SortOrder::Ascend)
.with_limit(limit),
),
Expand All @@ -256,6 +255,37 @@ impl StoreKv for Etcd {
Ok(result)
}

/// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all
/// the values for the prefix with limit.
async fn get_values_paged_all(
&mut self,
key_prefix: &str,
limit: i64,
) -> Result<Vec<(String, Value)>, Error> {
let mut prefix = key_prefix;
let mut first = true;
let mut all_values = vec![];
let mut values;
loop {
values = self.get_values_paged(prefix, limit).await?;

if !first && values.get(0).is_some() {
values.remove(0);
}

first = false;

all_values.extend(values.clone());

if let Some((key, _)) = values.last() {
prefix = key;
} else {
break;
}
}
Ok(all_values)
}

/// Deletes objects with the given key prefix.
async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error> {
if let Some((lease_id, lock_key)) = self.lease_lock()? {
Expand Down
2 changes: 1 addition & 1 deletion utils/pstor/src/products/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ pub fn key_prefix_obj<K: AsRef<str>>(key_type: K) -> String {

/// Fetches the product v1 key prefix and returns true if entry is present.
pub async fn detect_product_v1_prefix<S: Store>(store: &mut S) -> Result<bool, crate::Error> {
let prefix = store.get_values_prefix(&key_prefix()).await?;
let prefix = store.get_values_paged(&key_prefix(), 3).await?;
Ok(!prefix.is_empty())
}
3 changes: 3 additions & 0 deletions utils/utils-lib/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ pub const SNAPSHOT_MAX_TRANSACTION_LIMIT: usize = 5;

/// Label for the csi-node nvme ana multi-path.
pub const CSI_NODE_NVME_ANA: &str = "openebs.io/csi-node.nvme-ana";

/// Max limit for etcd pagination.
pub const ETCD_MAX_PAGE_LIMIT: &str = "500";

0 comments on commit e6b65a0

Please sign in to comment.