From c6acd4f11646ecd10f923eb55e592eaf4c0f62d8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 11 Mar 2022 11:42:07 +0530 Subject: [PATCH 1/3] In case of connetion failure, return error code --- rumqttc/src/eventloop.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9889317..42fedd50 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -35,6 +35,8 @@ pub enum ConnectionError { Network(#[from] tls::Error), #[error("I/O: {0}")] Io(#[from] io::Error), + #[error("Connection refused, return code: {0:?}")] + ConnectionRefused(ConnectReturnCode), #[error("Stream done")] StreamDone, #[error("Requests done")] @@ -348,21 +350,16 @@ async fn mqtt_connect( // wait for 'timeout' time to validate connack let packet = time::timeout(Duration::from_secs(options.connection_timeout()), async { - let packet = match network.read().await? { + match network.read().await? { Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { - Packet::ConnAck(connack) - } - Incoming::ConnAck(connack) => { - let error = format!("Broker rejected. Reason = {:?}", connack.code); - return Err(io::Error::new(io::ErrorKind::InvalidData, error)); + Ok(Packet::ConnAck(connack)) } + Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => { let error = format!("Expecting connack. Received = {:?}", packet); - return Err(io::Error::new(io::ErrorKind::InvalidData, error)); + Err(io::Error::new(io::ErrorKind::InvalidData, error))? } - }; - - io::Result::Ok(packet) + } }) .await??; From 10172c706da99631a30e3b59f942bca8e42bac02 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Mar 2022 10:00:03 +0530 Subject: [PATCH 2/3] New variant NotConnAck --- rumqttc/src/eventloop.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 42fedd50..4ca3ebe1 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -36,7 +36,9 @@ pub enum ConnectionError { #[error("I/O: {0}")] Io(#[from] io::Error), #[error("Connection refused, return code: {0:?}")] - ConnectionRefused(ConnectReturnCode), + ConnectionRefused(ConnAck), + #[error("Expected ConnAck packet, received: {0:?}")] + NotConnAck(Packet), #[error("Stream done")] StreamDone, #[error("Requests done")] @@ -354,11 +356,8 @@ async fn mqtt_connect( Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => { Ok(Packet::ConnAck(connack)) } - Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), - packet => { - let error = format!("Expecting connack. Received = {:?}", packet); - Err(io::Error::new(io::ErrorKind::InvalidData, error))? - } + Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack)), + packet => Err(ConnectionError::NotConnAck(packet)), } }) .await??; From 7f9d4f93832a688bd6816224463c9e1b9bf71a9a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Mar 2022 14:15:02 +0530 Subject: [PATCH 3/3] clean up --- rumqttc/src/eventloop.rs | 24 +++++++++--------------- rumqttc/tests/reliability.rs | 16 +++++++++------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 4ca3ebe1..fb4d8cad 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -2,7 +2,6 @@ use crate::{framed::Network, Transport}; use crate::{tls, Incoming, MqttState, Packet, Request, StateError}; use crate::{MqttOptions, Outgoing}; -use crate::mqttbytes; use crate::mqttbytes::v4::*; use async_channel::{bounded, Receiver, Sender}; #[cfg(feature = "websocket")] @@ -29,18 +28,17 @@ pub enum ConnectionError { MqttState(#[from] StateError), #[error("Timeout")] Timeout(#[from] Elapsed), - #[error("Packet parsing error: {0}")] - Mqtt4Bytes(mqttbytes::Error), - #[error("Network: {0}")] - Network(#[from] tls::Error), + #[cfg(feature = "websocket")] + #[error("Websocket: {0}")] + Websocket(#[from] async_tungstenite::tungstenite::error::Error), + #[error("TLS: {0}")] + Tls(#[from] tls::Error), #[error("I/O: {0}")] Io(#[from] io::Error), #[error("Connection refused, return code: {0:?}")] - ConnectionRefused(ConnAck), + ConnectionRefused(ConnectReturnCode), #[error("Expected ConnAck packet, received: {0:?}")] NotConnAck(Packet), - #[error("Stream done")] - StreamDone, #[error("Requests done")] RequestsDone, #[error("Cancel request by the user")] @@ -297,9 +295,7 @@ async fn network_connect(options: &MqttOptions) -> Result Result { Ok(Packet::ConnAck(connack)) } - Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack)), + Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), packet => Err(ConnectionError::NotConnAck(packet)), } }) diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 9fe430c2..3fc3e48c 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -426,16 +426,18 @@ async fn next_poll_after_connect_failure_reconnects() { time::sleep(Duration::from_secs(1)).await; let mut eventloop = EventLoop::new(options, 5); - let event = eventloop.poll().await; - let error = "Broker rejected. Reason = BadUserNamePassword"; - match event { - Err(ConnectionError::Io(e)) => assert_eq!(e.to_string(), error), + match eventloop.poll().await { + Err(ConnectionError::ConnectionRefused(ConnectReturnCode::BadUserNamePassword)) => (), v => panic!("Expected bad username password error. Found = {:?}", v), } - let event = eventloop.poll().await.unwrap(); - let connack = ConnAck::new(ConnectReturnCode::Success, false); - assert_eq!(event, Event::Incoming(Packet::ConnAck(connack))); + match eventloop.poll().await { + Ok(Event::Incoming(Packet::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + }))) => (), + v => panic!("Expected ConnAck Success. Found = {:?}", v), + } } #[tokio::test]