diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9889317..fb4d8cad 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -2,7 +2,6 @@ use crate::{framed::Network, Transport}; use crate::{tls, Incoming, MqttState, Packet, Request, StateError}; use crate::{MqttOptions, Outgoing}; -use crate::mqttbytes; use crate::mqttbytes::v4::*; use async_channel::{bounded, Receiver, Sender}; #[cfg(feature = "websocket")] @@ -29,14 +28,17 @@ pub enum ConnectionError { MqttState(#[from] StateError), #[error("Timeout")] Timeout(#[from] Elapsed), - #[error("Packet parsing error: {0}")] - Mqtt4Bytes(mqttbytes::Error), - #[error("Network: {0}")] - Network(#[from] tls::Error), + #[cfg(feature = "websocket")] + #[error("Websocket: {0}")] + Websocket(#[from] async_tungstenite::tungstenite::error::Error), + #[error("TLS: {0}")] + Tls(#[from] tls::Error), #[error("I/O: {0}")] Io(#[from] io::Error), - #[error("Stream done")] - StreamDone, + #[error("Connection refused, return code: {0:?}")] + ConnectionRefused(ConnectReturnCode), + #[error("Expected ConnAck packet, received: {0:?}")] + NotConnAck(Packet), #[error("Requests done")] RequestsDone, #[error("Cancel request by the user")] @@ -293,9 +295,7 @@ async fn network_connect(options: &MqttOptions) -> Result Result { - Packet::ConnAck(connack) + Ok(Packet::ConnAck(connack)) } - Incoming::ConnAck(connack) => { - let error = format!("Broker rejected. Reason = {:?}", connack.code); - return Err(io::Error::new(io::ErrorKind::InvalidData, error)); - } - packet => { - let error = format!("Expecting connack. Received = {:?}", packet); - return Err(io::Error::new(io::ErrorKind::InvalidData, error)); - } - }; - - io::Result::Ok(packet) + Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)), + packet => Err(ConnectionError::NotConnAck(packet)), + } }) .await??; diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 9fe430c2..3fc3e48c 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -426,16 +426,18 @@ async fn next_poll_after_connect_failure_reconnects() { time::sleep(Duration::from_secs(1)).await; let mut eventloop = EventLoop::new(options, 5); - let event = eventloop.poll().await; - let error = "Broker rejected. Reason = BadUserNamePassword"; - match event { - Err(ConnectionError::Io(e)) => assert_eq!(e.to_string(), error), + match eventloop.poll().await { + Err(ConnectionError::ConnectionRefused(ConnectReturnCode::BadUserNamePassword)) => (), v => panic!("Expected bad username password error. Found = {:?}", v), } - let event = eventloop.poll().await.unwrap(); - let connack = ConnAck::new(ConnectReturnCode::Success, false); - assert_eq!(event, Event::Incoming(Packet::ConnAck(connack))); + match eventloop.poll().await { + Ok(Event::Incoming(Packet::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + }))) => (), + v => panic!("Expected ConnAck Success. Found = {:?}", v), + } } #[tokio::test]