From 3ce9b037c19b04a30545c66bdfdfb48aa30da8f5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 23 Jan 2024 15:50:07 +0530 Subject: [PATCH] feat: preferential ordering of streams during read from disk (#289) * feat: preferential ordering of streams * test: remove incomplete test * test: reorganize for testing priority --- configs/config.toml | 9 +- uplink/src/base/mod.rs | 26 ++-- uplink/src/base/serializer/mod.rs | 200 ++++++++++++++++++++++++++---- uplink/src/lib.rs | 1 + vd-lib | 1 + 5 files changed, 205 insertions(+), 32 deletions(-) create mode 160000 vd-lib diff --git a/configs/config.toml b/configs/config.toml index f3c81e9fd..6df7206a6 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -75,19 +75,23 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # should perform on the data transiting through the stream. Currently supported # compression schemes are Lz4 and Disabled. Defaults to Disabled. # - persistence(optional): helps persist relevant information for data recovery purposes, -# used when there is a network/system failure. +# used when there is a network/system failure. +# - priority(optional, u8): Higher prioirity streams get to push their data +# onto the network first. # # In the following config for the device_shadow stream we set buf_size to 1 and mark # it as non-persistent. streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" flush_period = 5 +priority = 75 # Example using compression [streams.imu] topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4" buf_size = 100 compression = "Lz4" +priority = 50 # Configuration details associated with uplink's persistent storage module which writes publish # packets to disk in case of slow or crashed network, for recovery purposes. @@ -121,9 +125,12 @@ persistence = { max_file_count = 3 } # # NOTE: Action statuses are expected on a specifc topic as configured in example below. # This also means that we require a topic to be configured or uplink will error out. +# Given the importance of conveying action status at the earliest to platform, +# it has highest priority by default of 255. [action_status] topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" flush_period = 2 +priority = 255 # Configurations for uplink's built-in file downloader, including the actions that can trigger # a download, the location in file system where uplink will download and store files from the diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 5508478c0..73b9422fe 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -1,5 +1,5 @@ +use std::cmp::Ordering; use std::env::current_dir; -use std::hash::Hash; use std::path::PathBuf; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, fmt::Debug}; @@ -64,7 +64,7 @@ pub fn clock() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() } -#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)] pub enum Compression { #[default] Disabled, @@ -72,7 +72,7 @@ pub enum Compression { } #[serde_as] -#[derive(Debug, Clone, Deserialize, Eq)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct StreamConfig { pub topic: String, #[serde(default = "max_buf_size")] @@ -86,6 +86,8 @@ pub struct StreamConfig { pub compression: Compression, #[serde(default)] pub persistence: Persistence, + #[serde(default)] + pub priority: u8, } impl Default for StreamConfig { @@ -96,23 +98,27 @@ impl Default for StreamConfig { flush_period: default_timeout(), compression: Compression::Disabled, persistence: Persistence::default(), + priority: 0, } } } -impl Hash for StreamConfig { - fn hash(&self, state: &mut H) { - self.topic.hash(state) +impl Ord for StreamConfig { + fn cmp(&self, other: &Self) -> Ordering { + match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) { + (Ordering::Equal, o) => o, + (o, _) => o.reverse(), + } } } -impl PartialEq for StreamConfig { - fn eq(&self, other: &Self) -> bool { - self.topic == other.topic +impl PartialOrd for StreamConfig { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } -#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)] pub struct Persistence { #[serde(default = "default_file_size")] pub max_file_size: usize, diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 35ac7c8fc..0866294a3 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -1,6 +1,6 @@ mod metrics; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{self, Write}; use std::time::Instant; use std::{sync::Arc, time::Duration}; @@ -134,14 +134,14 @@ impl MqttClient for AsyncClient { } struct StorageHandler { - map: HashMap, Storage>, + map: BTreeMap, Storage>, // Stream being read from read_stream: Option>, } impl StorageHandler { fn new(config: Arc) -> Result { - let mut map = HashMap::with_capacity(2 * config.streams.len()); + let mut map = BTreeMap::new(); for (stream_name, stream_config) in config.streams.iter() { let mut storage = Storage::new(&stream_config.topic, stream_config.persistence.max_file_size); @@ -870,6 +870,7 @@ impl CtrlTx { #[cfg(test)] mod test { use serde_json::Value; + use tokio::spawn; use std::collections::HashMap; use std::time::Duration; @@ -971,23 +972,17 @@ mod test { } impl MockCollector { - fn new(data_tx: flume::Sender>) -> MockCollector { - MockCollector { - stream: Stream::new( - "hello", - StreamConfig { - topic: "hello/world".to_string(), - buf_size: 1, - ..Default::default() - }, - data_tx, - ), - } + fn new( + stream_name: &str, + stream_config: StreamConfig, + data_tx: flume::Sender>, + ) -> MockCollector { + MockCollector { stream: Stream::new(stream_name, stream_config, data_tx) } } fn send(&mut self, i: u32) -> Result<(), Error> { let payload = Payload { - stream: "hello".to_owned(), + stream: Default::default(), sequence: i, timestamp: 0, payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}")?, @@ -1010,7 +1005,11 @@ mod test { net_rx.recv().unwrap(); }); - let mut collector = MockCollector::new(data_tx); + let (stream_name, stream_config) = ( + "hello", + StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + ); + let mut collector = MockCollector::new(stream_name, stream_config, data_tx); std::thread::spawn(move || { for i in 1..3 { collector.send(i).unwrap(); @@ -1064,7 +1063,11 @@ mod test { net_rx.recv().unwrap(); }); - let mut collector = MockCollector::new(data_tx); + let (stream_name, stream_config) = ( + "hello", + StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + ); + let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Faster collector, send data every 5s std::thread::spawn(move || { for i in 1..10 { @@ -1092,7 +1095,11 @@ mod test { let config = Arc::new(default_config()); let (mut serializer, data_tx, _) = defaults(config); - let mut collector = MockCollector::new(data_tx); + let (stream_name, stream_config) = ( + "hello", + StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + ); + let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Faster collector, send data every 5s std::thread::spawn(move || { for i in 1..10 { @@ -1149,7 +1156,11 @@ mod test { .entry(Arc::new(Default::default())) .or_insert(Storage::new("hello/world", 1024)); - let mut collector = MockCollector::new(data_tx); + let (stream_name, stream_config) = ( + "hello", + StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + ); + let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Run a collector practically once std::thread::spawn(move || { for i in 2..6 { @@ -1204,7 +1215,11 @@ mod test { })) .or_insert(Storage::new("hello/world", 1024)); - let mut collector = MockCollector::new(data_tx); + let (stream_name, stream_config) = ( + "hello", + StreamConfig { topic: "hello/world".to_string(), buf_size: 1, ..Default::default() }, + ); + let mut collector = MockCollector::new(stream_name, stream_config, data_tx); // Run a collector std::thread::spawn(move || { for i in 2..6 { @@ -1230,4 +1245,147 @@ mod test { s => unreachable!("Unexpected status: {:?}", s), } } + + #[tokio::test] + // Ensures that the data of streams are removed on the basis of preference + async fn preferential_send_on_network() { + let mut config = default_config(); + config.stream_metrics.timeout = Duration::from_secs(1000); + config.streams.extend([ + ( + "one".to_owned(), + StreamConfig { topic: "topic/one".to_string(), priority: 1, ..Default::default() }, + ), + ( + "two".to_owned(), + StreamConfig { topic: "topic/two".to_string(), priority: 2, ..Default::default() }, + ), + ( + "top".to_owned(), + StreamConfig { + topic: "topic/top".to_string(), + priority: u8::MAX, + ..Default::default() + }, + ), + ]); + let config = Arc::new(config); + + let (mut serializer, _data_tx, req_rx) = defaults(config.clone()); + + let publish = |topic: String, i: u32| Publish { + dup: false, + qos: QoS::AtMostOnce, + retain: false, + topic, + pkid: 0, + payload: Bytes::from(i.to_string()), + }; + + let mut one = serializer + .storage_handler + .map + .entry(Arc::new(StreamConfig { + topic: "topic/one".to_string(), + priority: 1, + ..Default::default() + })) + .or_insert_with(|| unreachable!()); + write_to_disk(publish("topic/one".to_string(), 1), &mut one).unwrap(); + write_to_disk(publish("topic/one".to_string(), 10), &mut one).unwrap(); + + let top = serializer + .storage_handler + .map + .entry(Arc::new(StreamConfig { + topic: "topic/top".to_string(), + priority: u8::MAX, + ..Default::default() + })) + .or_insert_with(|| unreachable!()); + write_to_disk(publish("topic/top".to_string(), 100), top).unwrap(); + write_to_disk(publish("topic/top".to_string(), 1000), top).unwrap(); + + let two = serializer + .storage_handler + .map + .entry(Arc::new(StreamConfig { + topic: "topic/two".to_string(), + priority: 2, + ..Default::default() + })) + .or_insert_with(|| unreachable!()); + write_to_disk(publish("topic/two".to_string(), 3), two).unwrap(); + + let mut default = serializer + .storage_handler + .map + .entry(Arc::new(StreamConfig { + topic: "topic/default".to_string(), + priority: 0, + ..Default::default() + })) + .or_insert(Storage::new("topic/default", 1024)); + write_to_disk(publish("topic/default".to_string(), 0), &mut default).unwrap(); + write_to_disk(publish("topic/default".to_string(), 2), &mut default).unwrap(); + + // run serializer in the background + spawn(async { serializer.start().await.unwrap() }); + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/top"); + assert_eq!(payload, "100"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/top"); + assert_eq!(payload, "1000"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/two"); + assert_eq!(payload, "3"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/one"); + assert_eq!(payload, "1"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/one"); + assert_eq!(payload, "10"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "0"); + } + _ => unreachable!(), + } + + match req_rx.recv_async().await.unwrap() { + Request::Publish(Publish { topic, payload, .. }) => { + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "2"); + } + _ => unreachable!(), + } + } } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index f87cbe76a..a72727a80 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -130,6 +130,7 @@ pub mod config { topic = "/tenants/{tenant_id}/devices/{device_id}/action/status" buf_size = 1 flush_period = 2 + priority = 255 # highest priority for quick delivery of action status info to platform [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" diff --git a/vd-lib b/vd-lib new file mode 160000 index 000000000..1e71aa442 --- /dev/null +++ b/vd-lib @@ -0,0 +1 @@ +Subproject commit 1e71aa442b0e9c9deb6aba46bff1e535e93cebf7