Skip to content

Commit

Permalink
fix: allow configuring max_stream_count (#358)
Browse files Browse the repository at this point in the history
* feat: allow configuring `max_stream_count`

* deprecate consideration for simulator in bridge

* fix: don't expect more than 1 stream for actions_lane

* doc: example config

* restore Cargo.lock
  • Loading branch information
de-sh authored Aug 28, 2024
1 parent f33ba7e commit 0faacae
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 9 deletions.
3 changes: 3 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ persistence_path = "/tmp/uplink/"
# Size of in-memory buffer for dynamically created streams. Used for backlog management.
default_buf_size = 1024 # 1KB

# Maximum number of data streams that can be accepted by uplink
max_stream_count = 10

# MQTT client configuration
#
# Required Parameters
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ActionsBridge {
action_status.batch_size = 1;

streams_config.insert("action_status".to_owned(), action_status);
let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx);
let mut streams = Streams::new(1, device_config, package_tx, metrics_tx);
streams.config_streams(streams_config);

Self {
Expand Down
3 changes: 2 additions & 1 deletion uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl DataBridge {
let (data_tx, data_rx) = bounded(10);
let (ctrl_tx, ctrl_rx) = bounded(1);

let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx);
let mut streams =
Streams::new(config.max_stream_count, device_config, package_tx, metrics_tx);
streams.config_streams(config.streams.clone());

Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx }
Expand Down
18 changes: 11 additions & 7 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use flume::Sender;
use log::{error, info, trace};

use crate::config::{Config, DeviceConfig, StreamConfig};
use crate::config::{DeviceConfig, StreamConfig};

use super::{
delaymap::DelayMap,
Expand All @@ -13,7 +13,7 @@ use super::{
};

pub struct Streams<T> {
config: Arc<Config>,
max_stream_count: usize,
device_config: Arc<DeviceConfig>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
Expand All @@ -23,17 +23,18 @@ pub struct Streams<T> {

impl<T: Point> Streams<T> {
pub fn new(
config: Arc<Config>,
max_stream_count: usize,
device_config: Arc<DeviceConfig>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
) -> Self {
let map = HashMap::with_capacity(max_stream_count);
Self {
config,
max_stream_count,
device_config,
data_tx,
metrics_tx,
map: HashMap::new(),
map,
stream_timeouts: DelayMap::new(),
}
}
Expand All @@ -50,8 +51,11 @@ impl<T: Point> Streams<T> {

// Create stream if it doesn't already exist
if !self.map.contains_key(&stream_name) {
if self.config.simulator.is_none() && self.map.keys().len() > 20 {
error!("Failed to create {:?} stream. More than max 20 streams", stream_name);
if self.map.keys().len() > self.max_stream_count {
error!(
"Failed to create {:?} stream. More than max {} streams",
stream_name, self.max_stream_count
);
return;
}

Expand Down
8 changes: 8 additions & 0 deletions uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ use crate::collector::journalctl::JournalCtlConfig;
use crate::collector::logcat::LogcatConfig;

pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
pub const MAX_STREAM_COUNT: usize = 20;

#[inline]
fn default_timeout() -> Duration {
DEFAULT_TIMEOUT
}

#[inline]
fn default_stream_count() -> usize {
MAX_STREAM_COUNT
}

#[inline]
fn max_batch_size() -> usize {
MAX_BATCH_SIZE
Expand Down Expand Up @@ -260,6 +266,8 @@ pub struct Config {
#[serde(default = "default_tcpapps")]
pub tcpapps: HashMap<String, AppConfig>,
pub mqtt: MqttConfig,
#[serde(default = "default_stream_count")]
pub max_stream_count: usize,
#[serde(default)]
pub processes: Vec<ActionRoute>,
#[serde(default)]
Expand Down

0 comments on commit 0faacae

Please sign in to comment.