Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use pagination to load values from pstor #737

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) struct RegistryInner<S: Store> {
thin_args: ThinArgs,
/// Check if the HA feature is enabled.
ha_disabled: bool,
/// Etcd max page size.
etcd_max_page_size: i64,
}

impl Registry {
Expand All @@ -121,6 +123,7 @@ impl Registry {
host_acl: Vec<HostAccessControl>,
thin_args: ThinArgs,
ha_enabled: bool,
etcd_max_page_size: i64,
) -> Result<Self, SvcError> {
let store_endpoint = Self::format_store_endpoint(&store_url);
tracing::info!("Connecting to persistent store at {}", store_endpoint);
Expand Down Expand Up @@ -171,14 +174,15 @@ impl Registry {
legacy_prefix_present,
thin_args,
ha_disabled: ha_enabled,
etcd_max_page_size,
}),
};
registry.init().await?;

// Disable v1 compat if nexus_info keys are migrated.
if registry.config().mayastor_compat_v1() && registry.nexus_info_v1_migrated().await? {
// Delete the v1 nexus_info keys by brute force.
delete_all_v1_nexus_info(&mut store)
delete_all_v1_nexus_info(&mut store, etcd_max_page_size)
.await
.map_err(|error| StoreError::Generic {
source: Box::new(error),
Expand Down Expand Up @@ -406,7 +410,11 @@ impl Registry {
async fn init(&self) -> Result<(), SvcError> {
let mut store = self.store.lock().await;
self.specs
.init(store.deref_mut(), self.legacy_prefix_present)
.init(
store.deref_mut(),
self.legacy_prefix_present,
self.etcd_max_page_size,
)
.await?;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ pub const MSP_OPERATOR: &str = "msp-operator";
pub(crate) async fn migrate_product_v1_to_v2<S: Store>(
store: &mut S,
spec_type: StorableObjectType,
etcd_max_page_size: i64,
) -> Result<(), StoreError> {
info!("Migrating {spec_type:?} from v1 to v2 key space");
let prefix = &product_v1_key_prefix_obj(spec_type);
let store_entries = store.get_values_prefix(prefix).await?;
let store_entries = store
.get_values_paged_all(prefix, etcd_max_page_size)
.await?;
for (k, v) in store_entries {
let id = k
.split('/')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::controller::{
};
use agents::errors::SvcError;
use stor_port::{
transport_api::ResourceKind,
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::{ErrorChain, ResourceKind},
types::v0::{
openapi::apis::Uuid,
store::{
Expand All @@ -18,10 +19,11 @@ use stor_port::{
node::NodeSpec,
pool::PoolSpec,
replica::ReplicaSpec,
volume::{AffinityGroupSpec, VolumeSpec},
snapshots::volume::VolumeSnapshot,
volume::{AffinityGroupSpec, VolumeContentSource, VolumeSpec},
AsOperationSequencer, OperationMode, OperationSequence, SpecStatus, SpecTransaction,
},
transport::{NexusId, NodeId, PoolId, ReplicaId, VolumeId},
transport::{NexusId, NodeId, PoolId, ReplicaId, SnapshotId, VolumeId},
},
};

Expand All @@ -30,14 +32,6 @@ use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use snafu::{ResultExt, Snafu};
use std::{fmt::Debug, ops::Deref, sync::Arc};
use stor_port::{
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::ErrorChain,
types::v0::{
store::{snapshots::volume::VolumeSnapshot, volume::VolumeContentSource},
transport::SnapshotId,
},
};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
Expand Down Expand Up @@ -909,6 +903,7 @@ impl ResourceSpecsLocked {
&self,
store: &mut S,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SvcError> {
let spec_types = [
StorableObjectType::VolumeSpec,
Expand All @@ -919,7 +914,7 @@ impl ResourceSpecsLocked {
StorableObjectType::VolumeSnapshot,
];
for spec in &spec_types {
self.populate_specs(store, *spec, legacy_prefix_present)
self.populate_specs(store, *spec, legacy_prefix_present, etcd_max_page_size)
.await
.map_err(|error| SvcError::Internal {
details: error.full_string(),
Expand Down Expand Up @@ -996,22 +991,22 @@ impl ResourceSpecsLocked {
store: &mut S,
spec_type: StorableObjectType,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SpecError> {
if legacy_prefix_present {
migrate_product_v1_to_v2(store, spec_type)
migrate_product_v1_to_v2(store, spec_type, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreMigrate {
source: Box::new(e),
})?;
}
let prefix = key_prefix_obj(spec_type, API_VERSION);
let store_entries =
store
.get_values_prefix(&prefix)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_entries = store
.get_values_paged_all(&prefix, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_values = store_entries.iter().map(|e| e.1.clone()).collect();

let mut resource_specs = self.0.write();
Expand Down
7 changes: 6 additions & 1 deletion control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod watch;
use clap::Parser;
use controller::registry::NumRebuilds;
use std::{net::SocketAddr, num::ParseIntError};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR, ETCD_MAX_PAGE_LIMIT};

use stor_port::HostAccessControl;
use utils::tracing_telemetry::{trace::TracerProvider, KeyValue};
Expand Down Expand Up @@ -108,6 +108,10 @@ pub(crate) struct CliArgs {
/// This is useful when the frontend nodes do not support the NVMe ANA feature.
#[clap(long, env = "HA_DISABLED")]
pub(crate) disable_ha: bool,

/// Etcd Pagination Limit.
#[clap(long, default_value = ETCD_MAX_PAGE_LIMIT)]
pub(crate) etcd_page_limit: u32,
}
impl CliArgs {
fn args() -> Self {
Expand Down Expand Up @@ -178,6 +182,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
},
cli_args.thin_args,
cli_args.disable_ha,
cli_args.etcd_page_limit as i64,
)
.await?;

Expand Down
92 changes: 91 additions & 1 deletion control-plane/agents/src/bin/core/tests/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use deployer_cluster::{etcd_client::Client, *};
use stor_port::{
pstor::{etcd::Etcd, StoreObj},
pstor::{etcd::Etcd, key_prefix_obj, ApiVersion, StorableObjectType, StoreKv, StoreObj},
types::v0::{
openapi::models,
store::registry::{ControlPlaneService, StoreLeaseOwner, StoreLeaseOwnerKey},
transport,
},
};

use serde_json::Value;
use std::str::FromStr;
use uuid::Uuid;

/// Test that the content of the registry is correctly loaded from the persistent store on start up.
#[tokio::test]
async fn bootstrap_registry() {
Expand Down Expand Up @@ -212,3 +216,89 @@ async fn core_agent_lease_lock() {
tracing::info!("core: {:?}", core.state);
assert_eq!(Some(false), core.state.unwrap().running);
}

const OLD_VOLUME_PREFIX: &str = "/namespace/default/control-plane/VolumeSpec";

#[tokio::test]
async fn etcd_pagination() {
let lease_ttl = std::time::Duration::from_secs(2);
let cluster = ClusterBuilder::builder()
.with_io_engines(0)
.with_rest(false)
.with_jaeger(false)
.with_store_lease_ttl(lease_ttl)
.build()
.await
.unwrap();

let mut etcd = Etcd::new("0.0.0.0:2379").await.unwrap();

let node_prefix = key_prefix_obj(StorableObjectType::NodeSpec, ApiVersion::V0);
let volume_prefix = key_prefix_obj(StorableObjectType::VolumeSpec, ApiVersion::V0);

// Persist some nodes in etcd.
for i in 1 .. 11 {
let key = format!("{}/node{}", node_prefix, i);
let json_str = format!(
r#"{{"id":"mayastor-node{}","endpoint":"136.144.51.107:10124","labels":{{}}}}"#,
i
);
let value = Value::from_str(&json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// Persist some volumes in new keyspace in etcd.
for _i in 1 .. 4 {
let uuid = Uuid::new_v4();
let key = format!("{}/{}", volume_prefix, uuid);
let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// Persist some volumes in old key space in etcd.
for _i in 1 .. 6 {
let uuid = Uuid::new_v4();
let key = format!("{}/{}", OLD_VOLUME_PREFIX, uuid);
let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}
Abhinandan-Purkait marked this conversation as resolved.
Show resolved Hide resolved

// Persist some nexus info in old key space in etcd.
for _i in 1 .. 6 {
let uuid = Uuid::new_v4();
let key = format!("{}", uuid);
let json_str = r#"{"children":[{"healthy":true,"uuid":"82779efa-a0c7-4652-a37b-83eefd894714"},{"healthy":true,"uuid":"2d98fa96-ac12-40be-acdc-e3559c0b1530"},{"healthy":true,"uuid":"620ff519-419a-48d6-97a8-c1ba3260d87e"}],"clean_shutdown":false}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// There Should be exactly 10 Nodes.
let node_kvs = etcd.get_values_paged_all(&node_prefix, 3).await.unwrap();
assert_eq!(node_kvs.len(), 10);

// There Should be exactly 5 New Volumes.
let volume_kvs = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap();
assert_eq!(volume_kvs.len(), 3);

// There Should be exactly 5 Old Volumes.
let volume_kvs = etcd
.get_values_paged_all(OLD_VOLUME_PREFIX, 3)
.await
.unwrap();
assert_eq!(volume_kvs.len(), 5);

cluster.restart_core().await;
cluster
.volume_service_liveness(None)
.await
.expect("Should have restarted by now");

// There Should be exactly 10 New Volumes, after the migration.
let volume_kvs_all = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap();
assert_eq!(volume_kvs_all.len(), 8);

let all = etcd.get_values_paged_all("", 3).await.unwrap();
assert_eq!(all.len(), 26);
}
23 changes: 10 additions & 13 deletions control-plane/stor-port/src/types/v0/store/nexus_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use std::fmt::Debug;
use tracing::info;
use uuid::Uuid;

/// ETCD Pagination limit.
const ETCD_PAGED_LIMIT: i64 = 1000;

/// Definition of the nexus information that gets saved in the persistent
/// store.
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -139,17 +136,23 @@ impl StorableObject for NexusInfo {

/// Deletes all v1 nexus_info by fetching all keys and parsing the key to UUID and deletes on
/// success.
pub async fn delete_all_v1_nexus_info<S: Store>(store: &mut S) -> Result<(), StoreError> {
let mut prefix: &str = "";
pub async fn delete_all_v1_nexus_info<S: Store>(
store: &mut S,
etcd_page_limit: i64,
) -> Result<(), StoreError> {
let mut first = true;
let mut kvs;
loop {
kvs = store.get_values_paged(prefix, ETCD_PAGED_LIMIT).await?;
let mut last = Some("".to_string());

while let Some(prefix) = &last {
kvs = store.get_values_paged(prefix, etcd_page_limit, "").await?;

if !first && kvs.get(0).is_some() {
kvs.remove(0);
}
first = false;

last = kvs.last().map(|(key, _)| key.to_string());
// If the key is a uuid, i.e. nexus_info v1 key, and the value is a valid nexus_info then we
// delete it.
for (key, value) in &kvs {
Expand All @@ -159,12 +162,6 @@ pub async fn delete_all_v1_nexus_info<S: Store>(store: &mut S) -> Result<(), Sto
store.delete_kv(&key).await?;
}
}

if let Some((key, _)) = kvs.last() {
prefix = key;
} else {
break;
}
}
info!("v1.0.x nexus_info cleaned up successfully");
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions utils/pstor/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ pub trait StoreKv: Sync + Send + Clone {
&mut self,
key_prefix: &str,
limit: i64,
range_end: &str,
) -> Result<Vec<(String, Value)>, Error>;
/// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all
/// the values for the prefix with limit.
async fn get_values_paged_all(
&mut self,
key_prefix: &str,
limit: i64,
) -> Result<Vec<(String, Value)>, Error>;
/// Deletes all key values from a given prefix.
async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error>;
Expand Down
2 changes: 2 additions & 0 deletions utils/pstor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ pub enum Error {
source: Box<Error>,
description: String,
},
#[snafu(display("Failed to parse range end for start key: '{}'", start_key))]
RangeEnd { start_key: String },
}
Loading
Loading