From c3cf4fe2fb07281b7f4bda3b0ad866c5b5de86eb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 5 Feb 2024 22:59:34 +0530 Subject: [PATCH 1/2] refactor: action registration --- uplink/src/base/bridge/mod.rs | 21 +++++++++++---------- uplink/src/lib.rs | 21 +++++++-------------- uplink/src/main.rs | 19 +++++++------------ 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index d6e261e9..9f3a2a91 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -1,4 +1,4 @@ -use flume::{Receiver, Sender}; +use flume::{bounded, Receiver, Sender}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -101,20 +101,21 @@ impl Bridge { (self.actions.ctrl_tx(), self.data.ctrl_tx()) } - pub fn register_action_route( - &mut self, - route: ActionRoute, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_route(route, actions_tx) + pub fn register_action_route(&mut self, route: ActionRoute) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_route(route, actions_tx)?; + + Ok(actions_rx) } pub fn register_action_routes, V: IntoIterator>( &mut self, routes: V, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_routes(routes, actions_tx) + ) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_routes(routes, actions_tx)?; + + Ok(actions_rx) } } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index a72727a8..6bfff3f5 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -427,14 +427,12 @@ impl Uplink { let route = ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Tunshell Client", move || tunshell_client.start()); if !self.config.downloader.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?; let file_downloader = FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?; spawn_named_thread("File Downloader", || file_downloader.start()); @@ -444,8 +442,7 @@ impl Uplink { spawn_named_thread("Device Shadow Generator", move || device_shadow.start()); if !self.config.ota_installer.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.ota_installer.actions)?; let ota_installer = OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone()); spawn_named_thread("OTA Installer", move || ota_installer.start()); @@ -457,8 +454,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -473,8 +469,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -489,8 +484,7 @@ impl Uplink { }; if !self.config.processes.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.processes, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.processes)?; let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Process Handler", || { if let Err(e) = process_handler.start() { @@ -500,8 +494,7 @@ impl Uplink { } if !self.config.script_runner.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.script_runner, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.script_runner)?; let script_runner = ScriptRunner::new(actions_rx, bridge_tx); spawn_named_thread("Script Runner", || { if let Err(e) = script_runner.start() { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 4b42ae21..5dc24071 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Error; -use flume::bounded; use log::info; use structopt::StructOpt; use tokio::time::sleep; @@ -133,23 +132,19 @@ fn main() -> Result<(), Error> { for (app, cfg) in config.tcpapps.clone() { let mut route_rx = None; if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&cfg.actions)?; route_rx = Some(actions_rx) } tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx())); } - let simulator_actions = config.simulator.as_ref().and_then(|cfg| { - let mut route_rx = None; - if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx).unwrap(); - route_rx = Some(actions_rx) + let simulator_actions = match &config.simulator { + Some(cfg) if !cfg.actions.is_empty() => { + let actions_rx = bridge.register_action_routes(&cfg.actions)?; + Some(actions_rx) } - - route_rx - }); + _ => None, + }; let ctrl_tx = uplink.spawn(bridge)?; From e25c73a65823fc39a8b2810d22938f39c6a190d1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 6 Feb 2024 22:07:52 +0530 Subject: [PATCH 2/2] refactor: make it readable, rm `mut` --- uplink/src/main.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 5dc24071..e06aff13 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -130,11 +130,12 @@ fn main() -> Result<(), Error> { let mut tcpapps = vec![]; for (app, cfg) in config.tcpapps.clone() { - let mut route_rx = None; - if !cfg.actions.is_empty() { + let route_rx = if !cfg.actions.is_empty() { let actions_rx = bridge.register_action_routes(&cfg.actions)?; - route_rx = Some(actions_rx) - } + Some(actions_rx) + } else { + None + }; tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx())); }