Skip to content

Commit

Permalink
refactor: CtrlTx to send control messages to various uplink compone…
Browse files Browse the repository at this point in the history
…nts (#316)

* refactor: separate out `CtrlTx`

* test: fix compilation

* refactor: single `CtrlTx` for whole of uplink

* chore: rm unused dir
  • Loading branch information
de-sh authored Dec 11, 2023
1 parent 6e67c49 commit 383b7ce
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 75 deletions.
46 changes: 30 additions & 16 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,14 @@ impl ActionsBridge {
Ok(())
}

pub fn tx(&self) -> ActionsBridgeTx {
ActionsBridgeTx { status_tx: self.status_tx.clone(), shutdown_handle: self.ctrl_tx.clone() }
/// Handle to send action status messages from connected application
pub fn status_tx(&self) -> StatusTx {
StatusTx { inner: self.status_tx.clone() }
}

/// Handle to send action lane control messages
pub fn ctrl_tx(&self) -> CtrlTx {
CtrlTx { inner: self.ctrl_tx.clone() }
}

fn clear_current_action(&mut self) {
Expand Down Expand Up @@ -425,20 +431,28 @@ impl ActionRouter {
}
}

/// Handle for apps to send action status to bridge
#[derive(Debug, Clone)]
pub struct ActionsBridgeTx {
// Handle for apps to send action status to bridge
pub(crate) status_tx: Sender<ActionResponse>,
pub(crate) shutdown_handle: Sender<ActionBridgeShutdown>,
pub struct StatusTx {
pub(crate) inner: Sender<ActionResponse>,
}

impl ActionsBridgeTx {
impl StatusTx {
pub async fn send_action_response(&self, response: ActionResponse) {
self.status_tx.send_async(response).await.unwrap()
self.inner.send_async(response).await.unwrap()
}
}

/// Handle to send control messages to action lane
#[derive(Debug, Clone)]
pub struct CtrlTx {
pub(crate) inner: Sender<ActionBridgeShutdown>,
}

impl CtrlTx {
/// Triggers shutdown of `bridge::actions_lane`
pub async fn trigger_shutdown(&self) {
self.shutdown_handle.send_async(ActionBridgeShutdown).await.unwrap()
self.inner.send_async(ActionBridgeShutdown).await.unwrap()
}
}

Expand Down Expand Up @@ -654,7 +668,7 @@ mod tests {

let (route_tx, action_rx) = bounded(1);
bridge.register_action_route(test_route, route_tx).unwrap();
let bridge_tx = bridge.tx();
let bridge_tx = bridge.status_tx();

spawn_bridge(bridge);

Expand Down Expand Up @@ -697,8 +711,8 @@ mod tests {
let mut config = default_config();
config.action_redirections.insert("test".to_string(), "redirect".to_string());
let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();
let bridge_tx_1 = bridge.status_tx();
let bridge_tx_2 = bridge.status_tx();

let (route_tx, action_rx_1) = bounded(1);
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };
Expand Down Expand Up @@ -765,8 +779,8 @@ mod tests {
std::env::set_current_dir(&tmpdir).unwrap();
let config = default_config();
let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();
let bridge_tx_1 = bridge.status_tx();
let bridge_tx_2 = bridge.status_tx();

let (route_tx, action_rx_1) = bounded(1);
let tunshell_route =
Expand Down Expand Up @@ -856,8 +870,8 @@ mod tests {
std::env::set_current_dir(&tmpdir).unwrap();
let config = default_config();
let (mut bridge, actions_tx, data_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();
let bridge_tx_1 = bridge.status_tx();
let bridge_tx_2 = bridge.status_tx();

let (route_tx, action_rx_1) = bounded(1);
let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) };
Expand Down
34 changes: 24 additions & 10 deletions uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ impl DataBridge {
Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx }
}

pub fn tx(&self) -> DataBridgeTx {
DataBridgeTx { data_tx: self.data_tx.clone(), shutdown_handle: self.ctrl_tx.clone() }
/// Handle to send data points from source application
pub fn data_tx(&self) -> DataTx {
DataTx { inner: self.data_tx.clone() }
}

/// Handle to send data lane control message
pub fn ctrl_tx(&self) -> CtrlTx {
CtrlTx { inner: self.ctrl_tx.clone() }
}

pub async fn start(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -79,23 +85,31 @@ impl DataBridge {
}
}

/// Handle for apps to send action status to bridge
#[derive(Debug, Clone)]
pub struct DataBridgeTx {
// Handle for apps to send action status to bridge
pub(crate) data_tx: Sender<Payload>,
pub(crate) shutdown_handle: Sender<DataBridgeShutdown>,
pub struct DataTx {
pub(crate) inner: Sender<Payload>,
}

impl DataBridgeTx {
impl DataTx {
pub async fn send_payload(&self, payload: Payload) {
self.data_tx.send_async(payload).await.unwrap()
self.inner.send_async(payload).await.unwrap()
}

pub fn send_payload_sync(&self, payload: Payload) {
self.data_tx.send(payload).unwrap()
self.inner.send(payload).unwrap()
}
}

/// Handle to send control messages to data lane
#[derive(Debug, Clone)]
pub struct CtrlTx {
pub(crate) inner: Sender<DataBridgeShutdown>,
}

impl CtrlTx {
/// Triggers shutdown of `bridge::data_lane`
pub async fn trigger_shutdown(&self) {
self.shutdown_handle.send_async(DataBridgeShutdown).await.unwrap()
self.inner.send_async(DataBridgeShutdown).await.unwrap()
}
}
28 changes: 14 additions & 14 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use flume::{Receiver, Sender};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::join;

use std::{fmt::Debug, sync::Arc};

Expand All @@ -12,10 +11,10 @@ mod metrics;
pub(crate) mod stream;
mod streams;

pub use actions_lane::ActionsBridgeTx;
use actions_lane::{ActionsBridge, Error};
pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx};
use data_lane::DataBridge;
pub use data_lane::DataBridgeTx;
pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};

use super::StreamConfig;
use crate::base::ActionRoute;
Expand Down Expand Up @@ -93,8 +92,13 @@ impl Bridge {
Self { data, actions }
}

pub fn tx(&self) -> BridgeTx {
BridgeTx { data: self.data.tx(), actions: self.actions.tx() }
/// Handle to send data/action status messages
pub fn bridge_tx(&self) -> BridgeTx {
BridgeTx { data_tx: self.data.data_tx(), status_tx: self.actions.status_tx() }
}

pub(crate) fn ctrl_tx(&self) -> (actions_lane::CtrlTx, data_lane::CtrlTx) {
(self.actions.ctrl_tx(), self.data.ctrl_tx())
}

pub fn register_action_route(
Expand All @@ -116,24 +120,20 @@ impl Bridge {

#[derive(Debug, Clone)]
pub struct BridgeTx {
pub data: DataBridgeTx,
pub actions: ActionsBridgeTx,
pub data_tx: DataTx,
pub status_tx: StatusTx,
}

impl BridgeTx {
pub async fn send_payload(&self, payload: Payload) {
self.data.send_payload(payload).await
self.data_tx.send_payload(payload).await
}

pub fn send_payload_sync(&self, payload: Payload) {
self.data.send_payload_sync(payload)
self.data_tx.send_payload_sync(payload)
}

pub async fn send_action_response(&self, response: ActionResponse) {
self.actions.send_action_response(response).await
}

pub async fn trigger_shutdown(&self) {
join!(self.actions.trigger_shutdown(), self.data.trigger_shutdown());
self.status_tx.send_action_response(response).await
}
}
17 changes: 17 additions & 0 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use std::{collections::HashMap, fmt::Debug};

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tokio::join;

#[cfg(target_os = "linux")]
use crate::collector::journalctl::JournalCtlConfig;
#[cfg(target_os = "android")]
use crate::collector::logcat::LogcatConfig;

use self::bridge::stream::MAX_BUFFER_SIZE;
use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx};

pub mod actions;
pub mod bridge;
Expand Down Expand Up @@ -279,3 +281,18 @@ pub struct Config {
#[cfg(target_os = "android")]
pub logging: Option<LogcatConfig>,
}

/// Send control messages to the various components in uplink. Currently this is
/// used only to trigger uplink shutdown. Shutdown signals are sent to all
/// components simultaneously with a join.
#[derive(Debug, Clone)]
pub struct CtrlTx {
pub actions_lane: ActionsLaneCtrlTx,
pub data_lane: DataLaneCtrlTx,
}

impl CtrlTx {
pub async fn trigger_shutdown(&self) {
join!(self.actions_lane.trigger_shutdown(), self.data_lane.trigger_shutdown());
}
}
16 changes: 7 additions & 9 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ mod test {

use super::*;
use crate::base::{
bridge::{ActionsBridgeTx, DataBridgeTx},
bridge::{DataTx, StatusTx},
ActionRoute, DownloaderConfig, MqttConfig,
};

Expand All @@ -433,14 +433,12 @@ mod test {
}

fn create_bridge() -> (BridgeTx, Receiver<ActionResponse>) {
let (data_tx, _) = flume::bounded(2);
let (status_tx, status_rx) = flume::bounded(2);
let (shutdown_handle, _) = bounded(1);
let data = DataBridgeTx { data_tx, shutdown_handle };
let (shutdown_handle, _) = bounded(1);
let actions = ActionsBridgeTx { status_tx, shutdown_handle };

(BridgeTx { data, actions }, status_rx)
let (inner, _) = bounded(2);
let data_tx = DataTx { inner };
let (inner, status_rx) = bounded(2);
let status_tx = StatusTx { inner };

(BridgeTx { data_tx, status_tx }, status_rx)
}

#[test]
Expand Down
16 changes: 7 additions & 9 deletions uplink/src/collector/script_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,19 @@ mod tests {

use super::*;
use crate::{
base::bridge::{ActionsBridgeTx, DataBridgeTx},
base::bridge::{DataTx, StatusTx},
Action,
};

use flume::bounded;

fn create_bridge() -> (BridgeTx, Receiver<ActionResponse>) {
let (data_tx, _) = flume::bounded(2);
let (status_tx, status_rx) = flume::bounded(2);
let (shutdown_handle, _) = bounded(1);
let data = DataBridgeTx { data_tx, shutdown_handle };
let (shutdown_handle, _) = bounded(1);
let actions = ActionsBridgeTx { status_tx, shutdown_handle };

(BridgeTx { data, actions }, status_rx)
let (inner, _) = flume::bounded(2);
let data_tx = DataTx { inner };
let (inner, status_rx) = flume::bounded(2);
let status_tx = StatusTx { inner };

(BridgeTx { data_tx, status_tx }, status_rx)
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions uplink/src/console.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Router};
use log::info;
use uplink::base::bridge::BridgeTx;
use uplink::base::CtrlTx;

use crate::ReloadHandle;

#[derive(Debug, Clone)]
struct StateHandle {
reload_handle: ReloadHandle,
bridge_handle: BridgeTx,
ctrl_tx: CtrlTx,
}

#[tokio::main]
pub async fn start(port: u16, reload_handle: ReloadHandle, bridge_handle: BridgeTx) {
pub async fn start(port: u16, reload_handle: ReloadHandle, ctrl_tx: CtrlTx) {
let address = format!("0.0.0.0:{port}");
info!("Starting uplink console server: {address}");
let state = StateHandle { reload_handle, bridge_handle };
let state = StateHandle { reload_handle, ctrl_tx };
let app = Router::new()
.route("/logs", post(reload_loglevel))
.route("/shutdown", post(shutdown))
Expand All @@ -34,7 +34,7 @@ async fn reload_loglevel(State(state): State<StateHandle>, filter: String) -> im

async fn shutdown(State(state): State<StateHandle>) -> impl IntoResponse {
info!("Shutting down uplink");
state.bridge_handle.trigger_shutdown().await;
state.ctrl_tx.trigger_shutdown().await;

StatusCode::OK
}
8 changes: 5 additions & 3 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use anyhow::Error;

use base::bridge::stream::Stream;
use base::monitor::Monitor;
use base::CtrlTx;
use collector::device_shadow::DeviceShadow;
use collector::downloader::FileDownloader;
use collector::installer::OTAInstaller;
Expand Down Expand Up @@ -322,8 +323,9 @@ impl Uplink {
)
}

pub fn spawn(&mut self, bridge: Bridge) -> Result<(), Error> {
pub fn spawn(&mut self, bridge: Bridge) -> Result<CtrlTx, Error> {
let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10);
let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx();

let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx);
let mqtt_client = mqtt.client();
Expand Down Expand Up @@ -403,11 +405,11 @@ impl Uplink {
})
});

Ok(())
Ok(CtrlTx { actions_lane: ctrl_actions_lane, data_lane: ctrl_data_lane })
}

pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> {
let bridge_tx = bridge.tx();
let bridge_tx = bridge.bridge_tx();

let route =
ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) };
Expand Down
Loading

0 comments on commit 383b7ce

Please sign in to comment.