diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index c64682729..7f2a753d7 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -126,8 +126,14 @@ impl ActionsBridge { Ok(()) } - pub fn tx(&self) -> ActionsBridgeTx { - ActionsBridgeTx { status_tx: self.status_tx.clone(), shutdown_handle: self.ctrl_tx.clone() } + /// Handle to send action status messages from connected application + pub fn status_tx(&self) -> StatusTx { + StatusTx { inner: self.status_tx.clone() } + } + + /// Handle to send action lane control messages + pub fn ctrl_tx(&self) -> CtrlTx { + CtrlTx { inner: self.ctrl_tx.clone() } } fn clear_current_action(&mut self) { @@ -425,20 +431,28 @@ impl ActionRouter { } } +/// Handle for apps to send action status to bridge #[derive(Debug, Clone)] -pub struct ActionsBridgeTx { - // Handle for apps to send action status to bridge - pub(crate) status_tx: Sender, - pub(crate) shutdown_handle: Sender, +pub struct StatusTx { + pub(crate) inner: Sender, } -impl ActionsBridgeTx { +impl StatusTx { pub async fn send_action_response(&self, response: ActionResponse) { - self.status_tx.send_async(response).await.unwrap() + self.inner.send_async(response).await.unwrap() } +} + +/// Handle to send control messages to action lane +#[derive(Debug, Clone)] +pub struct CtrlTx { + pub(crate) inner: Sender, +} +impl CtrlTx { + /// Triggers shutdown of `bridge::actions_lane` pub async fn trigger_shutdown(&self) { - self.shutdown_handle.send_async(ActionBridgeShutdown).await.unwrap() + self.inner.send_async(ActionBridgeShutdown).await.unwrap() } } @@ -654,7 +668,7 @@ mod tests { let (route_tx, action_rx) = bounded(1); bridge.register_action_route(test_route, route_tx).unwrap(); - let bridge_tx = bridge.tx(); + let bridge_tx = bridge.status_tx(); spawn_bridge(bridge); @@ -697,8 +711,8 @@ mod tests { let mut config = default_config(); config.action_redirections.insert("test".to_string(), "redirect".to_string()); let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config)); - let bridge_tx_1 = bridge.tx(); - let bridge_tx_2 = bridge.tx(); + let bridge_tx_1 = bridge.status_tx(); + let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; @@ -765,8 +779,8 @@ mod tests { std::env::set_current_dir(&tmpdir).unwrap(); let config = default_config(); let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config)); - let bridge_tx_1 = bridge.tx(); - let bridge_tx_2 = bridge.tx(); + let bridge_tx_1 = bridge.status_tx(); + let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); let tunshell_route = @@ -856,8 +870,8 @@ mod tests { std::env::set_current_dir(&tmpdir).unwrap(); let config = default_config(); let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config)); - let bridge_tx_1 = bridge.tx(); - let bridge_tx_2 = bridge.tx(); + let bridge_tx_1 = bridge.status_tx(); + let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; diff --git a/uplink/src/base/bridge/data_lane.rs b/uplink/src/base/bridge/data_lane.rs index 6e100fc18..3101d897b 100644 --- a/uplink/src/base/bridge/data_lane.rs +++ b/uplink/src/base/bridge/data_lane.rs @@ -42,8 +42,14 @@ impl DataBridge { Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx } } - pub fn tx(&self) -> DataBridgeTx { - DataBridgeTx { data_tx: self.data_tx.clone(), shutdown_handle: self.ctrl_tx.clone() } + /// Handle to send data points from source application + pub fn data_tx(&self) -> DataTx { + DataTx { inner: self.data_tx.clone() } + } + + /// Handle to send data lane control message + pub fn ctrl_tx(&self) -> CtrlTx { + CtrlTx { inner: self.ctrl_tx.clone() } } pub async fn start(&mut self) -> Result<(), Error> { @@ -79,23 +85,31 @@ impl DataBridge { } } +/// Handle for apps to send action status to bridge #[derive(Debug, Clone)] -pub struct DataBridgeTx { - // Handle for apps to send action status to bridge - pub(crate) data_tx: Sender, - pub(crate) shutdown_handle: Sender, +pub struct DataTx { + pub(crate) inner: Sender, } -impl DataBridgeTx { +impl DataTx { pub async fn send_payload(&self, payload: Payload) { - self.data_tx.send_async(payload).await.unwrap() + self.inner.send_async(payload).await.unwrap() } pub fn send_payload_sync(&self, payload: Payload) { - self.data_tx.send(payload).unwrap() + self.inner.send(payload).unwrap() } +} + +/// Handle to send control messages to data lane +#[derive(Debug, Clone)] +pub struct CtrlTx { + pub(crate) inner: Sender, +} +impl CtrlTx { + /// Triggers shutdown of `bridge::data_lane` pub async fn trigger_shutdown(&self) { - self.shutdown_handle.send_async(DataBridgeShutdown).await.unwrap() + self.inner.send_async(DataBridgeShutdown).await.unwrap() } } diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index f96e217e9..d6e261e97 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -1,7 +1,6 @@ use flume::{Receiver, Sender}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::join; use std::{fmt::Debug, sync::Arc}; @@ -12,10 +11,10 @@ mod metrics; pub(crate) mod stream; mod streams; -pub use actions_lane::ActionsBridgeTx; use actions_lane::{ActionsBridge, Error}; +pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx}; use data_lane::DataBridge; -pub use data_lane::DataBridgeTx; +pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx}; use super::StreamConfig; use crate::base::ActionRoute; @@ -93,8 +92,13 @@ impl Bridge { Self { data, actions } } - pub fn tx(&self) -> BridgeTx { - BridgeTx { data: self.data.tx(), actions: self.actions.tx() } + /// Handle to send data/action status messages + pub fn bridge_tx(&self) -> BridgeTx { + BridgeTx { data_tx: self.data.data_tx(), status_tx: self.actions.status_tx() } + } + + pub(crate) fn ctrl_tx(&self) -> (actions_lane::CtrlTx, data_lane::CtrlTx) { + (self.actions.ctrl_tx(), self.data.ctrl_tx()) } pub fn register_action_route( @@ -116,24 +120,20 @@ impl Bridge { #[derive(Debug, Clone)] pub struct BridgeTx { - pub data: DataBridgeTx, - pub actions: ActionsBridgeTx, + pub data_tx: DataTx, + pub status_tx: StatusTx, } impl BridgeTx { pub async fn send_payload(&self, payload: Payload) { - self.data.send_payload(payload).await + self.data_tx.send_payload(payload).await } pub fn send_payload_sync(&self, payload: Payload) { - self.data.send_payload_sync(payload) + self.data_tx.send_payload_sync(payload) } pub async fn send_action_response(&self, response: ActionResponse) { - self.actions.send_action_response(response).await - } - - pub async fn trigger_shutdown(&self) { - join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown()); + self.status_tx.send_action_response(response).await } } diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 1b012dc0c..2cbe1e64a 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, fmt::Debug}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use tokio::join; #[cfg(target_os = "linux")] use crate::collector::journalctl::JournalCtlConfig; @@ -13,6 +14,7 @@ use crate::collector::journalctl::JournalCtlConfig; use crate::collector::logcat::LogcatConfig; use self::bridge::stream::MAX_BUFFER_SIZE; +use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx}; pub mod actions; pub mod bridge; @@ -279,3 +281,18 @@ pub struct Config { #[cfg(target_os = "android")] pub logging: Option, } + +/// Send control messages to the various components in uplink. Currently this is +/// used only to trigger uplink shutdown. Shutdown signals are sent to all +/// components simultaneously with a join. +#[derive(Debug, Clone)] +pub struct CtrlTx { + pub actions_lane: ActionsLaneCtrlTx, + pub data_lane: DataLaneCtrlTx, +} + +impl CtrlTx { + pub async fn trigger_shutdown(&self) { + join!(self.actions_lane.trigger_shutdown(), self.data_lane.trigger_shutdown()); + } +} diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 95b2c78da..f44ed1001 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -414,7 +414,7 @@ mod test { use super::*; use crate::base::{ - bridge::{ActionsBridgeTx, DataBridgeTx}, + bridge::{DataTx, StatusTx}, ActionRoute, DownloaderConfig, MqttConfig, }; @@ -433,14 +433,12 @@ mod test { } fn create_bridge() -> (BridgeTx, Receiver) { - let (data_tx, _) = flume::bounded(2); - let (status_tx, status_rx) = flume::bounded(2); - let (shutdown_handle, _) = bounded(1); - let data = DataBridgeTx { data_tx, shutdown_handle }; - let (shutdown_handle, _) = bounded(1); - let actions = ActionsBridgeTx { status_tx, shutdown_handle }; - - (BridgeTx { data, actions }, status_rx) + let (inner, _) = bounded(2); + let data_tx = DataTx { inner }; + let (inner, status_rx) = bounded(2); + let status_tx = StatusTx { inner }; + + (BridgeTx { data_tx, status_tx }, status_rx) } #[test] diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index dd921f041..3539796fd 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -150,21 +150,19 @@ mod tests { use super::*; use crate::{ - base::bridge::{ActionsBridgeTx, DataBridgeTx}, + base::bridge::{DataTx, StatusTx}, Action, }; use flume::bounded; fn create_bridge() -> (BridgeTx, Receiver) { - let (data_tx, _) = flume::bounded(2); - let (status_tx, status_rx) = flume::bounded(2); - let (shutdown_handle, _) = bounded(1); - let data = DataBridgeTx { data_tx, shutdown_handle }; - let (shutdown_handle, _) = bounded(1); - let actions = ActionsBridgeTx { status_tx, shutdown_handle }; - - (BridgeTx { data, actions }, status_rx) + let (inner, _) = flume::bounded(2); + let data_tx = DataTx { inner }; + let (inner, status_rx) = flume::bounded(2); + let status_tx = StatusTx { inner }; + + (BridgeTx { data_tx, status_tx }, status_rx) } #[test] diff --git a/uplink/src/console.rs b/uplink/src/console.rs index 8b3b48255..187bbda34 100644 --- a/uplink/src/console.rs +++ b/uplink/src/console.rs @@ -1,20 +1,20 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Router}; use log::info; -use uplink::base::bridge::BridgeTx; +use uplink::base::CtrlTx; use crate::ReloadHandle; #[derive(Debug, Clone)] struct StateHandle { reload_handle: ReloadHandle, - bridge_handle: BridgeTx, + ctrl_tx: CtrlTx, } #[tokio::main] -pub async fn start(port: u16, reload_handle: ReloadHandle, bridge_handle: BridgeTx) { +pub async fn start(port: u16, reload_handle: ReloadHandle, ctrl_tx: CtrlTx) { let address = format!("0.0.0.0:{port}"); info!("Starting uplink console server: {address}"); - let state = StateHandle { reload_handle, bridge_handle }; + let state = StateHandle { reload_handle, ctrl_tx }; let app = Router::new() .route("/logs", post(reload_loglevel)) .route("/shutdown", post(shutdown)) @@ -34,7 +34,7 @@ async fn reload_loglevel(State(state): State, filter: String) -> im async fn shutdown(State(state): State) -> impl IntoResponse { info!("Shutting down uplink"); - state.bridge_handle.trigger_shutdown().await; + state.ctrl_tx.trigger_shutdown().await; StatusCode::OK } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 52f4acd01..5ab2a5453 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -49,6 +49,7 @@ use anyhow::Error; use base::bridge::stream::Stream; use base::monitor::Monitor; +use base::CtrlTx; use collector::device_shadow::DeviceShadow; use collector::downloader::FileDownloader; use collector::installer::OTAInstaller; @@ -322,8 +323,9 @@ impl Uplink { ) } - pub fn spawn(&mut self, bridge: Bridge) -> Result<(), Error> { + pub fn spawn(&mut self, bridge: Bridge) -> Result { let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10); + let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx(); let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx); let mqtt_client = mqtt.client(); @@ -403,11 +405,11 @@ impl Uplink { }) }); - Ok(()) + Ok(CtrlTx { actions_lane: ctrl_actions_lane, data_lane: ctrl_data_lane }) } pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> { - let bridge_tx = bridge.tx(); + let bridge_tx = bridge.bridge_tx(); let route = ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 3cb23166b..a09ece674 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -127,7 +127,7 @@ fn main() -> Result<(), Error> { let mut bridge = uplink.configure_bridge(); uplink.spawn_builtins(&mut bridge)?; - let bridge_tx = bridge.tx(); + let bridge_tx = bridge.bridge_tx(); let mut tcpapps = vec![]; for (app, cfg) in config.tcpapps.clone() { @@ -137,7 +137,7 @@ fn main() -> Result<(), Error> { bridge.register_action_routes(&cfg.actions, actions_tx)?; route_rx = Some(actions_rx) } - tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.tx())); + tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx())); } let simulator_actions = config.simulator.as_ref().and_then(|cfg| { @@ -151,10 +151,9 @@ fn main() -> Result<(), Error> { route_rx }); - uplink.spawn(bridge)?; + let ctrl_tx = uplink.spawn(bridge)?; if let Some(config) = config.simulator.clone() { - let bridge_tx = bridge_tx.clone(); spawn_named_thread("Simulator", || { simulator::start(config, bridge_tx, simulator_actions).unwrap(); }); @@ -162,10 +161,8 @@ fn main() -> Result<(), Error> { if config.console.enabled { let port = config.console.port; - let bridge_tx = bridge_tx.clone(); - spawn_named_thread("Uplink Console", move || { - console::start(port, reload_handle, bridge_tx) - }); + let ctrl_tx = ctrl_tx.clone(); + spawn_named_thread("Uplink Console", move || console::start(port, reload_handle, ctrl_tx)); } let rt = tokio::runtime::Builder::new_current_thread() @@ -194,7 +191,7 @@ fn main() -> Result<(), Error> { // Handle a shutdown signal from POSIX while let Some(signal) = signals.next().await { match signal { - SIGTERM | SIGINT | SIGQUIT => bridge_tx.trigger_shutdown().await, + SIGTERM | SIGINT | SIGQUIT => ctrl_tx.trigger_shutdown().await, s => error!("Couldn't handle signal: {s}"), } }