diff --git a/configs/config.toml b/configs/config.toml index bd95388f..1e328f75 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -17,6 +17,9 @@ persistence_path = "/tmp/uplink/" # Size of in-memory buffer for dynamically created streams. Used for backlog management. default_buf_size = 1024 # 1KB +# Maximum number of data streams that can be accepted by uplink +max_stream_count = 10 + # MQTT client configuration # # Required Parameters diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index d55a26cd..a614b305 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -96,7 +96,7 @@ impl ActionsBridge { action_status.batch_size = 1; streams_config.insert("action_status".to_owned(), action_status); - let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx); + let mut streams = Streams::new(1, device_config, package_tx, metrics_tx); streams.config_streams(streams_config); Self { diff --git a/uplink/src/base/bridge/data_lane.rs b/uplink/src/base/bridge/data_lane.rs index f7de8440..34ffe68a 100644 --- a/uplink/src/base/bridge/data_lane.rs +++ b/uplink/src/base/bridge/data_lane.rs @@ -37,7 +37,8 @@ impl DataBridge { let (data_tx, data_rx) = bounded(10); let (ctrl_tx, ctrl_rx) = bounded(1); - let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx); + let mut streams = + Streams::new(config.max_stream_count, device_config, package_tx, metrics_tx); streams.config_streams(config.streams.clone()); Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx } diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index 6ae15a6d..ba384a3c 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use flume::Sender; use log::{error, info, trace}; -use crate::config::{Config, DeviceConfig, StreamConfig}; +use crate::config::{DeviceConfig, StreamConfig}; use super::{ delaymap::DelayMap, @@ -13,7 +13,7 @@ use super::{ }; pub struct Streams { - config: Arc, + max_stream_count: usize, device_config: Arc, data_tx: Sender>, metrics_tx: Sender, @@ -23,17 +23,18 @@ pub struct Streams { impl Streams { pub fn new( - config: Arc, + max_stream_count: usize, device_config: Arc, data_tx: Sender>, metrics_tx: Sender, ) -> Self { + let map = HashMap::with_capacity(max_stream_count); Self { - config, + max_stream_count, device_config, data_tx, metrics_tx, - map: HashMap::new(), + map, stream_timeouts: DelayMap::new(), } } @@ -50,8 +51,11 @@ impl Streams { // Create stream if it doesn't already exist if !self.map.contains_key(&stream_name) { - if self.config.simulator.is_none() && self.map.keys().len() > 20 { - error!("Failed to create {:?} stream. More than max 20 streams", stream_name); + if self.map.keys().len() > self.max_stream_count { + error!( + "Failed to create {:?} stream. More than max {} streams", + stream_name, self.max_stream_count + ); return; } diff --git a/uplink/src/config.rs b/uplink/src/config.rs index b66a1c16..7e6c11f9 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -14,12 +14,18 @@ use crate::collector::journalctl::JournalCtlConfig; use crate::collector::logcat::LogcatConfig; pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); +pub const MAX_STREAM_COUNT: usize = 20; #[inline] fn default_timeout() -> Duration { DEFAULT_TIMEOUT } +#[inline] +fn default_stream_count() -> usize { + MAX_STREAM_COUNT +} + #[inline] fn max_batch_size() -> usize { MAX_BATCH_SIZE @@ -260,6 +266,8 @@ pub struct Config { #[serde(default = "default_tcpapps")] pub tcpapps: HashMap, pub mqtt: MqttConfig, + #[serde(default = "default_stream_count")] + pub max_stream_count: usize, #[serde(default)] pub processes: Vec, #[serde(default)]