Skip to content

Commit

Permalink
fix: mark parallel actions as failed on restart (#349)
Browse files Browse the repository at this point in the history
* fix: persist `action_status` by default

* fix: mark parallel actions as failed on restart
  • Loading branch information
de-sh authored Jun 17, 2024
1 parent 16b7e4f commit 62f544a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
10 changes: 10 additions & 0 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum Error {
FailedCancellation,
#[error("Action cancelled by action_id: {0}")]
Cancelled(String),
#[error("Uplink restarted before action response")]
Restart,
}

pub struct ActionsBridge {
Expand Down Expand Up @@ -232,6 +234,14 @@ impl ActionsBridge {
if let Err(e) = self.save_current_action() {
error!("Failed to save current action: {e}");
}

// NOTE: marks parallel actions still in execution as failed
// (serializer will persist on disk even if network is down)
let parallel_actions: Vec<String> = self.parallel_actions.drain().collect();
for action_id in parallel_actions {
self.forward_action_error(&action_id, Error::Restart).await
}

// NOTE: there might be events still waiting for recv on bridge_rx
self.shutdown_handle.send(()).unwrap();

Expand Down
7 changes: 5 additions & 2 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,15 @@ struct StorageHandler {
impl StorageHandler {
fn new(config: Arc<Config>) -> Result<Self, Error> {
let mut map = BTreeMap::new();
for (stream_name, stream_config) in config.streams.iter() {
let mut streams = config.streams.clone();
// NOTE: persist action_status if not configured otherwise
streams.insert("action_status".into(), config.action_status.clone());
for (stream_name, stream_config) in streams {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
path.push(stream_name);
path.push(&stream_name);

std::fs::create_dir_all(&path).map_err(|_| {
Error::Persistence(config.persistence_path.to_string_lossy().to_string())
Expand Down
1 change: 1 addition & 0 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const DEFAULT_CONFIG: &str = r#"
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
batch_size = 1
flush_period = 2
persistence = { max_file_count = 1 } # Ensures action responses are not lost on restarts
priority = 255 # highest priority for quick delivery of action status info to platform
[streams.device_shadow]
Expand Down

0 comments on commit 62f544a

Please sign in to comment.