Skip to content

Commit

Permalink
refactor: make registration configurable, use only grpc endpoint, use…
Browse files Browse the repository at this point in the history
… tokio join instead of select

Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Feb 7, 2024
1 parent e6a04a0 commit 521ad39
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 66 deletions.
12 changes: 9 additions & 3 deletions control-plane/csi-driver/src/bin/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ pub(crate) struct AppNodesClientWrapper {

impl AppNodesClientWrapper {
/// Initialize AppNodes API client instance.
pub(crate) fn initialize(endpoint: &String) -> anyhow::Result<AppNodesClientWrapper> {
pub(crate) fn initialize(
endpoint: Option<&String>,
) -> anyhow::Result<Option<AppNodesClientWrapper>> {
let Some(endpoint) = endpoint else {
return Ok(None);
};

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?;

Expand All @@ -121,9 +127,9 @@ impl AppNodesClientWrapper {
endpoint, DEFAULT_TIMEOUT_FOR_REST_REQUESTS,
);

Ok(Self {
Ok(Some(Self {
client: AppNodesClient::new(Arc::new(tower)),
})
}))
}

/// Register an app node.
Expand Down
94 changes: 36 additions & 58 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use stor_port::platform;
use utils::tracing_telemetry::{FmtLayer, FmtStyle};

use crate::client::AppNodesClientWrapper;
use anyhow::anyhow;
use clap::Arg;
use futures::TryFutureExt;
use serde_json::json;
Expand Down Expand Up @@ -103,16 +102,15 @@ pub(super) async fn main() -> anyhow::Result<()> {
Arg::new("rest-endpoint")
.long("rest-endpoint")
.env("ENDPOINT")
.default_value("http://ksnode-1:30011")
.help("A URL endpoint to the control plane's rest endpoint")
.required(true),
)
.arg(
Arg::new("instance-endpoint")
.long("instance-endpoint")
.env("MY_POD_IP")
.help("Endpoint of current instance")
.required(true)
Arg::new("enable-registration")
.long("enable-registration")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.requires("rest-endpoint")
.help("Enable registration of the csi node with the control plane")
)
.arg(
Arg::new("csi-socket")
Expand All @@ -133,10 +131,9 @@ pub(super) async fn main() -> anyhow::Result<()> {
Arg::new("grpc-endpoint")
.short('g')
.long("grpc-endpoint")
.value_name("NAME")
.value_name("ENDPOINT")
.help("ip address where this instance runs, and optionally the gRPC port")
.default_value("0.0.0.0")
.required(false)
.required(true)
)
.arg(
Arg::new("v")
Expand Down Expand Up @@ -299,32 +296,31 @@ pub(super) async fn main() -> anyhow::Result<()> {
}

// Initialize the rest api client.
let client =
AppNodesClientWrapper::initialize(matches.get_one::<String>("rest-endpoint").unwrap())?;
let client = AppNodesClientWrapper::initialize(matches.get_one::<String>("rest-endpoint"))?;

let registration_enabled = matches.get_flag("enable-registration");

// Parse instance and grpc endpoints from the command line arguments and validate.
let (instance_sock_addr, grpc_sock_addr) = validate_endpoints(
matches.get_one::<String>("instance-endpoint").unwrap(),
let grpc_sock_addr = validate_endpoints(
matches.get_one::<String>("grpc-endpoint").unwrap(),
registration_enabled,
)?;

// Start the CSI server, node plugin grpc server and registration loop.
// Start the CSI server, node plugin grpc server and registration loop if registration is
// enabled.
*crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?;
tokio::select! {
result = CsiServer::run(csi_socket, &matches)? => {
result?;
}
result = NodePluginGrpcServer::run(grpc_sock_addr) => {
result?;
}
_ = run_registration_loop(node_name.clone(), instance_sock_addr.to_string(), Some(csi_labels), &client) => {}
}

// Deregister the node from the control plane on termination.
if let Err(error) = client.deregister_app_node(node_name).await {
error!("Failed to deregister node, {:?}", error);
}
Ok(())
let (csi, grpc, registration) = tokio::join!(
CsiServer::run(csi_socket, &matches)?,
NodePluginGrpcServer::run(grpc_sock_addr),
run_registration_loop(
node_name.clone(),
grpc_sock_addr.to_string(),
Some(csi_labels),
&client,
registration_enabled
)
);
vec![csi, grpc, registration].into_iter().collect()
}

struct CsiServer {}
Expand Down Expand Up @@ -400,41 +396,23 @@ async fn check_ana_and_label_node(
Ok(())
}

/// Validate that the instance endpoint and grpc endpoint are valid, and returns the instance
/// endpoint.
/// Validate that the grpc endpoint is valid.
fn validate_endpoints(
instance_endpoint: &str,
grpc_endpoint: &str,
) -> anyhow::Result<(SocketAddr, SocketAddr)> {
// Append the port to the grpc endpoint if it is not specified.
registration_enabled: bool,
) -> anyhow::Result<SocketAddr> {
let grpc_endpoint = if grpc_endpoint.contains(':') {
grpc_endpoint.to_string()
} else {
format!("{grpc_endpoint}:{GRPC_PORT}")
};

// Append the port to the instance endpoint with the grpc endpoint's port if it is not
// specified.
let instance_endpoint = if instance_endpoint.contains(':') {
instance_endpoint.to_string()
} else {
format!(
"{instance_endpoint}:{}",
grpc_endpoint
.split(':')
.last()
.ok_or(anyhow!("gRPC endpoint must have a port"))?
)
};

let instance_url = SocketAddr::from_str(&instance_endpoint)?;
let grpc_endpoint_url = SocketAddr::from_str(&grpc_endpoint)?;

if instance_url.port() != grpc_endpoint_url.port() {
return Err(anyhow!(
"instance endpoint and gRPC endpoint must have the same port"
// Should not allow using an unspecified ip if registration is enabled as grpc endpoint gets
// sent in registration request.
if registration_enabled && grpc_endpoint_url.ip().is_unspecified() {
return Err(anyhow::format_err!(
"gRPC endpoint: `0.0.0.0` is not allowed if registration is enabled"
));
}

Ok((instance_url, grpc_endpoint_url))
Ok(grpc_endpoint_url)
}
37 changes: 32 additions & 5 deletions control-plane/csi-driver/src/bin/node/registration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::client::AppNodesClientWrapper;
use crate::{client::AppNodesClientWrapper, shutdown_event::Shutdown};
use snafu::Snafu;
use std::{collections::HashMap, time::Duration};
use tokio::task::JoinError;
Expand All @@ -18,12 +18,29 @@ pub(crate) async fn run_registration_loop(
id: String,
endpoint: String,
labels: Option<HashMap<String, String>>,
client: &AppNodesClientWrapper,
) {
client: &Option<AppNodesClientWrapper>,
registration_enabled: bool,
) -> anyhow::Result<()> {
if !registration_enabled {
return Ok(());
}

let Some(client) = client else {
return Err(anyhow::anyhow!(
"Rest API Client should have been initialized if registration is enabled"
));
};

let mut logged_error = false;
loop {
let interval_duration = match client.register_app_node(&id, &endpoint, &labels).await {
Ok(_) => REGISTRATION_INTERVAL_ON_SUCCESS,
Ok(_) => {
if logged_error {
tracing::info!("Successfully re-registered the app node");
logged_error = false;
}
REGISTRATION_INTERVAL_ON_SUCCESS
}
Err(e) => {
if !logged_error {
error!("Failed to register app node: {:?}", e);
Expand All @@ -32,6 +49,16 @@ pub(crate) async fn run_registration_loop(
REGISTRATION_INTERVAL_ON_ERROR
}
};
tokio::time::sleep(interval_duration).await;
tokio::select! {
_ = tokio::time::sleep(interval_duration) => {}
_ = Shutdown::wait() => {
break;
}
}
}
// Deregister the node from the control plane on termination.
if let Err(error) = client.deregister_app_node(&id).await {
error!("Failed to deregister node, {:?}", error);
}
Ok(())
}

0 comments on commit 521ad39

Please sign in to comment.