Skip to content

Commit

Permalink
feat: preferential ordering of streams
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 23, 2024
1 parent fe7a9c2 commit 6a80601
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 21 deletions.
9 changes: 8 additions & 1 deletion configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,23 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# should perform on the data transiting through the stream. Currently supported
# compression schemes are Lz4 and Disabled. Defaults to Disabled.
# - persistence(optional): helps persist relevant information for data recovery purposes,
# used when there is a network/system failure.
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
#
# In the following config for the device_shadow stream we set buf_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75

# Example using compression
[streams.imu]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/imu/jsonarray/lz4"
buf_size = 100
compression = "Lz4"
priority = 50

# Configuration details associated with uplink's persistent storage module which writes publish
# packets to disk in case of slow or crashed network, for recovery purposes.
Expand Down Expand Up @@ -121,9 +125,12 @@ persistence = { max_file_count = 3 }
#
# NOTE: Action statuses are expected on a specifc topic as configured in example below.
# This also means that we require a topic to be configured or uplink will error out.
# Given the importance of conveying action status at the earliest to platform,
# it has highest priority by default of 255.
[action_status]
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
flush_period = 2
priority = 255

# Configurations for uplink's built-in file downloader, including the actions that can trigger
# a download, the location in file system where uplink will download and store files from the
Expand Down
26 changes: 16 additions & 10 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::env::current_dir;
use std::hash::Hash;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Debug};
Expand Down Expand Up @@ -64,15 +64,15 @@ pub fn clock() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}

#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, Hash, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)]
pub enum Compression {
#[default]
Disabled,
Lz4,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Eq)]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_buf_size")]
Expand All @@ -86,6 +86,8 @@ pub struct StreamConfig {
pub compression: Compression,
#[serde(default)]
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
}

impl Default for StreamConfig {
Expand All @@ -96,23 +98,27 @@ impl Default for StreamConfig {
flush_period: default_timeout(),
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
}
}
}

impl Hash for StreamConfig {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.topic.hash(state)
impl Ord for StreamConfig {
fn cmp(&self, other: &Self) -> Ordering {
match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) {
(Ordering::Equal, o) => o,
(o, _) => o.reverse(),
}
}
}

impl PartialEq for StreamConfig {
fn eq(&self, other: &Self) -> bool {
self.topic == other.topic
impl PartialOrd for StreamConfig {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
Expand Down
73 changes: 63 additions & 10 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod metrics;

use std::collections::{HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{self, Write};
use std::time::Instant;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -134,14 +134,14 @@ impl MqttClient for AsyncClient {
}

struct StorageHandler {
map: HashMap<Arc<StreamConfig>, Storage>,
map: BTreeMap<Arc<StreamConfig>, Storage>,
// Stream being read from
read_stream: Option<Arc<StreamConfig>>,
}

impl StorageHandler {
fn new(config: Arc<Config>) -> Result<Self, Error> {
let mut map = HashMap::with_capacity(2 * config.streams.len());
let mut map = BTreeMap::new();
for (stream_name, stream_config) in config.streams.iter() {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
Expand Down Expand Up @@ -870,6 +870,7 @@ impl CtrlTx {
#[cfg(test)]
mod test {
use serde_json::Value;
use tokio::time::sleep;

use std::collections::HashMap;
use std::time::Duration;
Expand Down Expand Up @@ -985,9 +986,9 @@ mod test {
}
}

fn send(&mut self, i: u32) -> Result<(), Error> {
fn send(&mut self, stream: String, i: u32) -> Result<(), Error> {
let payload = Payload {
stream: "hello".to_owned(),
stream,
sequence: i,
timestamp: 0,
payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}")?,
Expand All @@ -1013,7 +1014,7 @@ mod test {
let mut collector = MockCollector::new(data_tx);
std::thread::spawn(move || {
for i in 1..3 {
collector.send(i).unwrap();
collector.send("hello".to_owned(), i).unwrap();
}
});

Expand Down Expand Up @@ -1068,7 +1069,7 @@ mod test {
// Faster collector, send data every 5s
std::thread::spawn(move || {
for i in 1..10 {
collector.send(i).unwrap();
collector.send("hello".to_owned(), i).unwrap();
std::thread::sleep(Duration::from_secs(3));
}
});
Expand Down Expand Up @@ -1096,7 +1097,7 @@ mod test {
// Faster collector, send data every 5s
std::thread::spawn(move || {
for i in 1..10 {
collector.send(i).unwrap();
collector.send("hello".to_owned(), i).unwrap();
std::thread::sleep(Duration::from_secs(3));
}
});
Expand Down Expand Up @@ -1153,7 +1154,7 @@ mod test {
// Run a collector practically once
std::thread::spawn(move || {
for i in 2..6 {
collector.send(i).unwrap();
collector.send("hello".to_owned(), i).unwrap();
std::thread::sleep(Duration::from_secs(100));
}
});
Expand Down Expand Up @@ -1208,7 +1209,7 @@ mod test {
// Run a collector
std::thread::spawn(move || {
for i in 2..6 {
collector.send(i).unwrap();
collector.send("hello".to_owned(), i).unwrap();
std::thread::sleep(Duration::from_secs(10));
}
});
Expand All @@ -1230,4 +1231,56 @@ mod test {
s => unreachable!("Unexpected status: {:?}", s),
}
}

#[tokio::test]
// Ensures that the data of streams are removed on the basis of preference
async fn preferential_send_on_network() {
let mut config = default_config();
config.streams.extend([
(
"one".to_owned(),
StreamConfig { topic: "topic/one".to_string(), priority: 1, ..Default::default() },
),
(
"two".to_owned(),
StreamConfig { topic: "topic/two".to_string(), priority: 1, ..Default::default() },
),
(
"top".to_owned(),
StreamConfig {
topic: "topic/top".to_string(),
priority: u8::MAX,
..Default::default()
},
),
]);
let config = Arc::new(config);

let (serializer, data_tx, req_rx) = defaults(config);

let mut collector = MockCollector::new(data_tx);
// Run a collector
std::thread::spawn(move || {
collector.send("one".to_owned(), 1).unwrap();
collector.send("default".to_owned(), 0).unwrap();
collector.send("top".to_owned(), 100).unwrap();
collector.send("default".to_owned(), 2).unwrap();
collector.send("two".to_owned(), 3).unwrap();
collector.send("top".to_owned(), 1000).unwrap();
collector.send("one".to_owned(), 10).unwrap();
});

// run in the background
tokio::spawn(serializer.start());

sleep(Duration::from_secs(10)).await;

match req_rx.recv().unwrap() {
Request::Publish(Publish { topic, payload, .. }) => {
assert_eq!(topic, "topic/top");
assert_eq!(payload, "100");
}
_ => unreachable!(),
}
}
}
1 change: 1 addition & 0 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub mod config {
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
buf_size = 1
flush_period = 2
priority = 255 # highest priority for quick delivery of action status info to platform
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
Expand Down

0 comments on commit 6a80601

Please sign in to comment.