Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send unzipped AP State Event message to Kafka #68

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};

use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::{
net::TcpStream,
Expand Down Expand Up @@ -268,6 +269,7 @@ impl CGWConnectionProcessor {
// Make sure we always track the as accurate as possible the time
// of receiving of the event (where needed).
let timestamp = Local::now();
let mut kafaka_msg: String = String::new();

match msg {
Ok(msg) => match msg {
Expand All @@ -278,7 +280,11 @@ impl CGWConnectionProcessor {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
kafaka_msg = payload.clone();
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
kafaka_msg = decompressed;
}
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
Expand Down Expand Up @@ -313,7 +319,7 @@ impl CGWConnectionProcessor {
}

self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, payload)?;
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, kafaka_msg)?;
return Ok(CGWConnectionState::IsActive);
}
Ping(_t) => {
Expand Down
30 changes: 30 additions & 0 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,29 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
}
}

// Replace compressed data
let mut origin_msg = map.clone();
let params_value = match Value::from_str(unzipped_data.as_str()) {
Ok(val) => val,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to cast decompressed message to JSON Value",
));
}
};
if let Some(value) = origin_msg.get_mut("params") {
*value = params_value;
}

let kafka_msg = match serde_json::to_string(&origin_msg) {
Ok(msg) => msg,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to create decompressed Event message",
));
}
};

let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
Expand All @@ -388,6 +411,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: Some(kafka_msg),
};

return Ok(state_event);
Expand Down Expand Up @@ -438,6 +462,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down Expand Up @@ -655,6 +680,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
"client.leave" => {
Expand Down Expand Up @@ -734,6 +760,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
_ => {
Expand Down Expand Up @@ -775,6 +802,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -802,6 +830,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

return Ok(connect_event);
Expand All @@ -822,6 +851,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};

return Ok(reply_event);
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub enum CGWUCentralEventType {
pub struct CGWUCentralEvent {
pub serial: MacAddress,
pub evt_type: CGWUCentralEventType,
pub decompressed: Option<String>,
}

#[derive(Deserialize, Debug, Serialize)]
Expand Down Expand Up @@ -262,6 +263,7 @@ pub fn cgw_ucentral_parse_connect_event(message: Message) -> Result<CGWUCentralE
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

Ok(event)
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub fn cgw_ucentral_switch_parse_message(
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -216,6 +217,7 @@ pub fn cgw_ucentral_switch_parse_message(
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down