From 6a806012d54590df96a532f8e91fcec995045a5a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 23 Jan 2024 11:04:39 +0530 Subject: [PATCH] feat: preferential ordering of streams --- configs/config.toml | 9 +++- uplink/src/base/mod.rs | 26 ++++++----- uplink/src/base/serializer/mod.rs | 73 ++++++++++++++++++++++++++----- uplink/src/lib.rs | 1 + 4 files changed, 88 insertions(+), 21 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index f3c81e9fd..6df7206a6 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -75,19 +75,23 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # should perform on the data transiting through the stream. Currently supported # compression schemes are Lz4 and Disabled. Defaults to Disabled. # - persistence(optional): helps persist relevant information for data recovery purposes, -# used when there is a network/system failure. +# used when there is a network/system failure. +# - priority(optional, u8): Higher prioirity streams get to push their data +# onto the network first. # # In the following config for the device_shadow stream we set buf_size to 1 and mark # it as non-persistent. streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" flush_period = 5 +priority = 75 # Example using compression [streams.imu] topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4" buf_size = 100 compression = "Lz4" +priority = 50 # Configuration details associated with uplink's persistent storage module which writes publish # packets to disk in case of slow or crashed network, for recovery purposes. @@ -121,9 +125,12 @@ persistence = { max_file_count = 3 } # # NOTE: Action statuses are expected on a specifc topic as configured in example below. # This also means that we require a topic to be configured or uplink will error out. +# Given the importance of conveying action status at the earliest to platform, +# it has highest priority by default of 255. [action_status] topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" flush_period = 2 +priority = 255 # Configurations for uplink's built-in file downloader, including the actions that can trigger # a download, the location in file system where uplink will download and store files from the diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 5508478c0..73b9422fe 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -1,5 +1,5 @@ +use std::cmp::Ordering; use std::env::current_dir; -use std::hash::Hash; use std::path::PathBuf; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, fmt::Debug}; @@ -64,7 +64,7 @@ pub fn clock() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() } -#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)] pub enum Compression { #[default] Disabled, @@ -72,7 +72,7 @@ pub enum Compression { } #[serde_as] -#[derive(Debug, Clone, Deserialize, Eq)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct StreamConfig { pub topic: String, #[serde(default = "max_buf_size")] @@ -86,6 +86,8 @@ pub struct StreamConfig { pub compression: Compression, #[serde(default)] pub persistence: Persistence, + #[serde(default)] + pub priority: u8, } impl Default for StreamConfig { @@ -96,23 +98,27 @@ impl Default for StreamConfig { flush_period: default_timeout(), compression: Compression::Disabled, persistence: Persistence::default(), + priority: 0, } } } -impl Hash for StreamConfig { - fn hash(&self, state: &mut H) { - self.topic.hash(state) +impl Ord for StreamConfig { + fn cmp(&self, other: &Self) -> Ordering { + match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) { + (Ordering::Equal, o) => o, + (o, _) => o.reverse(), + } } } -impl PartialEq for StreamConfig { - fn eq(&self, other: &Self) -> bool { - self.topic == other.topic +impl PartialOrd for StreamConfig { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } -#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)] pub struct Persistence { #[serde(default = "default_file_size")] pub max_file_size: usize, diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 35ac7c8fc..c236672f5 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -1,6 +1,6 @@ mod metrics; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{self, Write}; use std::time::Instant; use std::{sync::Arc, time::Duration}; @@ -134,14 +134,14 @@ impl MqttClient for AsyncClient { } struct StorageHandler { - map: HashMap, Storage>, + map: BTreeMap, Storage>, // Stream being read from read_stream: Option>, } impl StorageHandler { fn new(config: Arc) -> Result { - let mut map = HashMap::with_capacity(2 * config.streams.len()); + let mut map = BTreeMap::new(); for (stream_name, stream_config) in config.streams.iter() { let mut storage = Storage::new(&stream_config.topic, stream_config.persistence.max_file_size); @@ -870,6 +870,7 @@ impl CtrlTx { #[cfg(test)] mod test { use serde_json::Value; + use tokio::time::sleep; use std::collections::HashMap; use std::time::Duration; @@ -985,9 +986,9 @@ mod test { } } - fn send(&mut self, i: u32) -> Result<(), Error> { + fn send(&mut self, stream: String, i: u32) -> Result<(), Error> { let payload = Payload { - stream: "hello".to_owned(), + stream, sequence: i, timestamp: 0, payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}")?, @@ -1013,7 +1014,7 @@ mod test { let mut collector = MockCollector::new(data_tx); std::thread::spawn(move || { for i in 1..3 { - collector.send(i).unwrap(); + collector.send("hello".to_owned(), i).unwrap(); } }); @@ -1068,7 +1069,7 @@ mod test { // Faster collector, send data every 5s std::thread::spawn(move || { for i in 1..10 { - collector.send(i).unwrap(); + collector.send("hello".to_owned(), i).unwrap(); std::thread::sleep(Duration::from_secs(3)); } }); @@ -1096,7 +1097,7 @@ mod test { // Faster collector, send data every 5s std::thread::spawn(move || { for i in 1..10 { - collector.send(i).unwrap(); + collector.send("hello".to_owned(), i).unwrap(); std::thread::sleep(Duration::from_secs(3)); } }); @@ -1153,7 +1154,7 @@ mod test { // Run a collector practically once std::thread::spawn(move || { for i in 2..6 { - collector.send(i).unwrap(); + collector.send("hello".to_owned(), i).unwrap(); std::thread::sleep(Duration::from_secs(100)); } }); @@ -1208,7 +1209,7 @@ mod test { // Run a collector std::thread::spawn(move || { for i in 2..6 { - collector.send(i).unwrap(); + collector.send("hello".to_owned(), i).unwrap(); std::thread::sleep(Duration::from_secs(10)); } }); @@ -1230,4 +1231,56 @@ mod test { s => unreachable!("Unexpected status: {:?}", s), } } + + #[tokio::test] + // Ensures that the data of streams are removed on the basis of preference + async fn preferential_send_on_network() { + let mut config = default_config(); + config.streams.extend([ + ( + "one".to_owned(), + StreamConfig { topic: "topic/one".to_string(), priority: 1, ..Default::default() }, + ), + ( + "two".to_owned(), + StreamConfig { topic: "topic/two".to_string(), priority: 1, ..Default::default() }, + ), + ( + "top".to_owned(), + StreamConfig { + topic: "topic/top".to_string(), + priority: u8::MAX, + ..Default::default() + }, + ), + ]); + let config = Arc::new(config); + + let (serializer, data_tx, req_rx) = defaults(config); + + let mut collector = MockCollector::new(data_tx); + // Run a collector + std::thread::spawn(move || { + collector.send("one".to_owned(), 1).unwrap(); + collector.send("default".to_owned(), 0).unwrap(); + collector.send("top".to_owned(), 100).unwrap(); + collector.send("default".to_owned(), 2).unwrap(); + collector.send("two".to_owned(), 3).unwrap(); + collector.send("top".to_owned(), 1000).unwrap(); + collector.send("one".to_owned(), 10).unwrap(); + }); + + // run in the background + tokio::spawn(serializer.start()); + + sleep(Duration::from_secs(10)).await; + + match req_rx.recv().unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/top"); + assert_eq!(payload, "100"); + } + _ => unreachable!(), + } + } } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index f87cbe76a..a72727a80 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -130,6 +130,7 @@ pub mod config { topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" buf_size = 1 flush_period = 2 + priority = 255 # highest priority for quick delivery of action status info to platform [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"